How to optimize memory access pattern / cache misses for this array decimate/downsample program?

1k views Asked by At

I was recently asked about a piece of code to decimate/downsample the array "in-place". This "decimation" function takes an array of ints and stores an entry at an even index i in the array at the index i/2. It does it for all entries in the array.

This would move all even indexed entries in the original array to the first-half of the array. The rest of the array can then be initialized to 0. The overall result is an array that preserved all even index entries in the original array (by moving them to the first-half) and the second-half of the array being 0. This is apparently used to downsample signals in signal processing.

The code looks something like this:

void decimate (vector<int>& a) {
   int sz = a.size();
   for (int i =0; i < sz; i++) {
     if (i%2 == 0) {
        a[i/2] = a[i];
     }
    }
    for (int i =(sz-1)/2; i < sz; i++) a[i] = 0;
}

After suggesting basic improvements that keep certain variables in registers, I can't find any further way to optimize it but not sure if that can't be done.

Are there ways one could optimize the memory access pattern in the loop for better cache performance? Or any other ways to optimize the main copy operations of compressing/down-sampling the array into the first-half ? (e.g. by vectorization for platforms that support it)

   for (int i =0; i < sz; i++) {
     if (i%2 == 0) {
        a[i/2] = a[i];
     }
    }

Are there any loop transformations (such as tiling/strip-mining) that can lead to highly efficient code for such decimate loop?

EDIT: There are a few different ways suggested in the answers below that seem to take advantage of memset/fill or pointer arithmetic to gain speed efficiency. This question is main focused on whether there are well-defined Loop Transformations that can significantly improve locality or cache misses (e.g. if it were a loop-nest with two loops, one could potentially look into loop tiling to optimize cache misses)

5

There are 5 answers

8
Hatatister On

Here is a version using pointer arithmetic and placement new which uses the fact that std::vector uses a continuous memory layout internally:

void down_sample(std::vector<int> & v){ 
    int * begin = &v[0];
    int * stop =  begin + v.size();
    int * position = begin + 2;
    int * half_position = begin +1;
    while( position < stop){
        *half_position = *position;
        ++half_position;
        position += 2;
    }
    size_t size = v.size()/2;
    int * a = new (half_position) int[size]();
}

On my machine this code runs 3 times as fast as yours with disabled optimizations and about 30 % faster than your version when compiled with -o3 on gcc7.2. I tested this with an vector size of 20 000 000 elements.

And I think that in your version line:

for (int i =(sz-1)/2; i < sz; i++) a[i] = 0;

should be

for (int i =(sz-1)/2 + 1; i < sz; i++) a[i] = 0;

otherwise there will be set too many elements to zero.

Taking into account John Zwinck's question I did some quick test with memset and std::fill instead of placement new.

Here are the results:

n = 20000000
compiled with -o0
orginal 0.111396 seconds
mine    0.0327938 seconds
memset  0.0303007 seconds
fill    0.0507268 seconds

compiled with -o3
orginal 0.0181994 seconds
mine    0.014135 seconds
memset  0.0141561 seconds
fill    0.0138893 seconds

n = 2000
compiled with -o0
orginal 3.0119e-05 seconds
mine    9.171e-06 seconds
memset  9.612e-06 seconds
fill    1.3868e-05 seconds

compiled with -o3
orginal 5.404e-06 seconds
mine    2.105e-06 seconds
memset  2.04e-06 seconds
fill    1.955e-06 seconds

n= 500000000 (with -o3)
mine=     0,350732
memeset = 0.349054  
fill =    0.352398

It seems that memset is a little bit faster on large vectors and std::fill a little bit faster on smaller vectors. But the difference is very small.

4
user2807083 On

My version of one pass decimate():

void decimate (std::vector<int>& a) {
    const std::size_t sz = a.size();
    const std::size_t half = sz / 2;

    bool size_even = ((sz % 2) == 0);

    std::size_t index = 2;
    for (; index < half; index += 2) {
        a[index/2] = a[index];
    }

    for (; index < sz; ++index) {
        a[(index+1)/2] = a[index];
        a[index] = 0;
    }

    if (size_even && (half < sz)) {
        a[half] = 0;
    }
}

and tests for it:

#include <vector>
#include <iostream>
#include <cstddef>

void decimate(std::vector<int> &v);

void print(std::vector<int> &a) {
    std::cout << "{";
    bool f = false;

    for(auto i:a) {
        if (f) std::cout << ", ";
        std::cout << i;
        f = true;
    }
    std::cout << "}" << std::endl;
}

void test(std::vector<int> v1, std::vector<int> v2) {
    auto v = v1;
    decimate(v1);

    bool ok = true;

    for(std::size_t i = 0; i < v1.size(); ++i) {
        ok = (ok && (v1[i] == v2[i]));
    }

    if (ok) {
        print(v);
        print(v1);
    } else {
        print(v);
        print(v1);
        print(v2);
    }
    std::cout << "--------- " << (ok?"ok":"fail") << "\n" << std::endl;
}

int main(int, char**)
{
    test({},
        {});

    test({1},
        {1});

    test({1, 2},
        {1, 0});

    test({1, 2, 3},
        {1, 3, 0});

    test({1, 2, 3, 4},
        {1, 3, 0, 0});

    test({1, 2, 3, 4, 5},
        {1, 3, 5, 0, 0});

    test({1, 2, 3, 4, 5, 6},
        {1, 3, 5, 0, 0, 0});

    test({1, 2, 3, 4, 5, 6, 7},
        {1, 3, 5, 7, 0, 0, 0});

    test({1, 2, 3, 4, 5, 6, 7, 8},
        {1, 3, 5, 7, 0, 0, 0, 0});

    test({1, 2, 3, 4, 5, 6, 7, 8, 9},
        {1, 3, 5, 7, 9, 0, 0, 0, 0});

    test({1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
        {1, 3, 5, 7, 9, 0, 0, 0, 0, 0});

    test({1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
        {1, 3, 5, 7, 9, 11, 0, 0, 0, 0, 0});

    return 0;
}
3
John Zwinck On

You have an array like this:

0 1 2 3 4 5 6 7 8 9

You want to end up with this:

0 2 4 6 8 0 0 0 0 0

I'd do it this way:

void decimate (vector<int>& a) {
  size_t slow = 1, fast = 2;

  // read the first half, write the first quarter
  size_t stop = (a.size()+1)/2;
  while (fast < stop) {
    a[slow++] = a[fast];
    fast += 2;
  }

  // read and clear the second half, write the second quarter
  stop = a.size();
  while (fast < stop) {
    a[slow++] = a[fast];
    a[fast++] = 0;
    a[fast++] = 0;
  }

  // clean up (only really needed when length is even)
  a[slow] = 0;
}

On my system, this is roughly 20% faster than your original version.

Now it's up to you to test and let us know if it's faster on your system!

1
schorsch312 On

Don't go up to sz, if you set it to zero afterwards.

If sz is even goto sz/2, if not to (sz-1)/2.

for (int i =0; i < sz_half; i++) 
        a[i] = a[2*i];
1
schorsch312 On

I compared all the answers given here. I used the intel compiler icc version 15.0.3. Optimization level O3 was used.

Orig: Time difference [micro s] = 79506
JohnZwinck: Time difference [micro s] = 69127   
Hatatister: Time difference [micro s] = 79838
user2807083: Time difference [micro s] = 80000
Schorsch312: Time difference [micro s] = 84491

All times refer to a vector with length 100000000.

#include <vector>
#include <cstddef>
#include <iostream>
#include <chrono>

const int MAX = 100000000;

void setup(std::vector<int> & v){
    for (int i = 0 ; i< MAX; i++) {
        v.push_back(i);
    }
}


void checkResult(std::vector<int> & v) {
    int half_length;
    if (MAX%2==0)
        half_length = MAX/2;
    else
        half_length = MAX-1/2;

    for (int i = 0 ; i< half_length; i++) {
        if (v[i] != i*2)
            std::cout << "Error: v[i]="  << v[i] << " but should be "  <<     2*i <<  "\n";
    }

    for (int i = half_length+1; i< MAX; i++) {
        if (v[i] != 0)
            std::cout << "Error: v[i]="  << v[i] << " but should be 0 \n";
    }
}

void down_sample(){
    std::vector<int> v;
    setup(v);

    auto start_time = std::chrono::steady_clock::now();

    int * begin = &v[0];
    int * stop =  begin + v.size();
    int * position = begin + 2;
    int * half_position = begin +1;
    while( position < stop){
        *half_position = *position;
        ++half_position;
        position += 2;
    }
    size_t size = v.size()/2;
    int * a = new (half_position) int[size]();

    auto duration = std::chrono::steady_clock::now() - start_time;
    std::cout << "Orig: Time difference [micro s] = " << std::chrono::duration_cast<std::chrono::microseconds>(duration).count() <<std::endl;
    checkResult(v);
}

void down_sample_JohnZwinck () {
    std::vector<int> v;
    setup(v);

    auto start_time = std::chrono::steady_clock::now();

    size_t slow = 1, fast = 2;

    // read the first half, write the first quarter
    size_t stop = (v.size()+1)/2;
    while (fast < stop) {
        v[slow++] = v[fast];
        fast += 2;
    }

    // read and clear the second half, write the second quarter
    stop = v.size();
    while (fast < stop) {
        v[slow++] = v[fast];
        v[fast++] = 0;
        v[fast++] = 0;
    }

    // clean up (only really needed when length is even)
    v[slow] = 0;

    auto duration = std::chrono::steady_clock::now() - start_time;
    std::cout << "JohnZwinck: Time difference [micro s] = " << std::chrono::duration_cast<std::chrono::microseconds>(duration).count() <<std::endl;
    checkResult(v);

}

void down_sample_Schorsch312(){ 
    std::vector<int> v;
    setup(v);

    auto start_time = std::chrono::steady_clock::now();
    int half_length;

    if (v.size()%2==0)
        half_length = MAX/2;
    else
        half_length = MAX-1/2;

    for (int i=0; i < half_length; i++) 
        v[i] = v[2*i];
    for (int i=half_length+1; i< MAX; i++) 
        v[i]=0;

    auto duration = std::chrono::steady_clock::now() - start_time;
std::cout << "Schorsch312: Time difference [micro s] = " << std::chrono::duration_cast<std::chrono::microseconds>(duration).count() <<std::endl;
}

void down_sample_Hatatister(){ 
    std::vector<int> v;
    setup(v);

    auto start_time = std::chrono::steady_clock::now();

    int * begin = &v[0];
    int * stop =  begin + v.size();
    int * position = begin + 2;
    int * half_position = begin +1;

    while( position < stop){
        *half_position = *position;
        ++half_position;
        position += 2;
    }
    size_t size = v.size()/2;
    int * a = new (half_position) int[size]();
    auto duration = std::chrono::steady_clock::now() - start_time;
    std::cout << "Hatatister: Time difference [micro s] = " << std::chrono::duration_cast<std::chrono::microseconds>(duration).count() <<std::endl;

    checkResult(v);
}

void down_sample_user2807083 () {
    std::vector<int> v;
    setup(v);

    auto start_time = std::chrono::steady_clock::now();

    const std::size_t sz = v.size();
    const std::size_t half = sz / 2;

    bool size_even = ((sz % 2) == 0);

    std::size_t index = 2;

    for (; index < half; index += 2) {
        v[index/2] = v[index];
    }

    for (; index < sz; ++index) {
        v[(index+1)/2] = v[index];
        v[index] = 0;
    }

    if (size_even && (half < sz)) {
        v[half] = 0;
    }
    auto duration = std::chrono::steady_clock::now() - start_time;
    std::cout << "user2807083: Time difference [micro s] = " << std::chrono::duration_cast<std::chrono::microseconds>(duration).count() <<std::endl;

    checkResult(v);

}

int main () {
    down_sample();
    down_sample_JohnZwinck ();
    down_sample_Schorsch312();
    down_sample_Hatatister();
    down_sample_user2807083();
}