I have a fairly simple loop:
auto indexRecord = getRowPointer(0);
bool equals;
// recordCount is about 6 000 000
for (int i = 0; i < recordCount; ++i) {
equals = BitString::equals(SelectMask, indexRecord, maxBytesValue);
rowsFound += equals;
indexRecord += byteSize; // byteSize is 7
}
Where BitString::equals
is:
static inline bool equals(const char * mask, const char * record, uint64_t maxVal) {
return !(((*( uint64_t * ) mask) & (maxVal & *( uint64_t * ) record)) ^ (maxVal & *( uint64_t * ) record));
}
This code is used to simulate a Bitmap Index querying in databases.
My question is, if there's a way to vectorize the loop, going through all the records.
When trying to compile with GCC and -fopt-info-vec-missed -O3
I am getting: missed: couldn't vectorize loop
.
I am new to this kind of optimizations and would like to learn more, it just feels like I am missing something.
EDIT
First of all, thank you all for answers. I should've included a Reprex.
Here it is now, with all functionality needed, as close as possible I could've done. All of this is done on x86-64
platform and I have both GCC and Clang available.
#include <iostream>
#include <cstdio>
#include <cstring>
#include <cstdint>
#include <bitset>
#include <ctime>
#include <cstdlib>
constexpr short BYTE_SIZE = 8;
class BitString {
public:
static int getByteSizeFromBits(int bitSize) {
return (bitSize + BYTE_SIZE - 1) / BYTE_SIZE;
}
static void setBitString(char *rec, int bitOffset) {
rec[bitOffset / 8] |= (1 << (bitOffset % BYTE_SIZE));
}
static inline bool equals(const char *mask, const char *record, uint64_t maxVal) {
return !(((*(uint64_t *) mask) & (maxVal & *(uint64_t *) record)) ^ (maxVal & *(uint64_t *) record));
}
};
// Class representing a table schema
class TableSchema {
public:
// number of attributes of a table
unsigned int attrs_count = -1;
// the attribute size in bytes, eg. 3 equals to something like CHAR(3) in SQL
unsigned int *attr_sizes = nullptr;
// max value (domain) of an attribute, -1 for unlimited, ()
int *attr_max_values = nullptr;
// the offset of each attribute, to simplify some pointer arithmetic for further use
unsigned int *attribute_offsets = nullptr;
// sum of attr_sizes if the record size;
unsigned int record_size = -1;
void calculate_offsets() {
if (attrs_count <= 0 || attribute_offsets != nullptr) {
return;
}
attribute_offsets = new unsigned int[attrs_count];
int offset = 0;
for (int i = 0; i < attrs_count; ++i) {
attribute_offsets[i] = offset;
offset += attr_sizes[i];
}
record_size = offset;
}
TableSchema() = default;
~TableSchema() {
if (attribute_offsets != nullptr) {
delete[] attribute_offsets;
attribute_offsets = nullptr;
}
attrs_count = -1;
}
};
class BitmapIndex {
private:
char *mData = nullptr;
short bitSize = 0;
int byteSize = 0;
int attrsCount = 0;
int *attrsMaxValue = nullptr;
int *bitIndexAttributeOffset = nullptr;
unsigned int recordCount = 0;
char *SelectMask;
unsigned int capacity = 0;
inline char *getRowPointer(unsigned int rowId) const {
return mData + rowId * byteSize;
}
inline bool shouldColBeIndexed(int max_col_value) const {
return max_col_value > 0;
}
public:
BitmapIndex(const int *attrs_max_value, int attrs_count, unsigned int capacity) {
auto maxValuesSum = 0;
attrsMaxValue = new int[attrs_count];
attrsCount = attrs_count;
bitIndexAttributeOffset = new int[attrs_count];
auto bitOffset = 0;
// attribute's max value is the same as number of bits used to encode the current value
// e.g., if attribute's max value is 3, we use 001 to represent value 1, 010 for 2, 100 for 3 and so on
for (int i = 0; i < attrs_count; ++i) {
attrsMaxValue[i] = attrs_max_value[i];
bitIndexAttributeOffset[i] = bitOffset;
// col is indexed only if it's max value is > 0, -1 means
if (!shouldColBeIndexed(attrs_max_value[i]))
continue;
maxValuesSum += attrs_max_value[i];
bitOffset += attrs_max_value[i];
}
bitSize = (short) maxValuesSum;
byteSize = BitString::getByteSizeFromBits(bitSize);
mData = new char[byteSize * capacity];
memset(mData, 0, byteSize * capacity);
SelectMask = new char[byteSize];
this->capacity = capacity;
}
~BitmapIndex() {
if (mData != nullptr) {
delete[] mData;
mData = nullptr;
delete[] attrsMaxValue;
attrsMaxValue = nullptr;
delete[] SelectMask;
SelectMask = nullptr;
}
}
unsigned long getTotalByteSize() const {
return byteSize * capacity;
}
// add record to index
void addRecord(const char * record, const unsigned int * attribute_sizes) {
auto indexRecord = getRowPointer(recordCount);
unsigned int offset = 0;
for (int j = 0; j < attrsCount; ++j) {
if (attrsMaxValue[j] != -1) {
// byte col value
char colValue = *(record + offset);
if (colValue > attrsMaxValue[j]) {
throw std::runtime_error("Col value is bigger than max allowed value!");
}
// printf("%d ", colValue);
BitString::setBitString(indexRecord, bitIndexAttributeOffset[j] + colValue);
}
offset += attribute_sizes[j];
}
recordCount += 1;
}
// SELECT COUNT(*)
int Select(const char *query) const {
uint64_t rowsFound = 0;
memset(SelectMask, 0, byteSize);
for (int col = 0; col < attrsCount; ++col) {
if (!shouldColBeIndexed(attrsMaxValue[col])) {
continue;
}
auto col_value = query[col];
if (col_value < 0) {
for (int i = 0; i < attrsMaxValue[col]; ++i) {
BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + i);
}
} else {
BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + col_value);
}
}
uint64_t maxBytesValue = 0;
uint64_t byteVals = 0xff;
for (int i = 0; i < byteSize; ++i) {
maxBytesValue |= byteVals << (i * 8);
}
auto indexRecord = getRowPointer(0);
for (int i = 0; i < recordCount; ++i) {
rowsFound += BitString::equals(SelectMask, indexRecord, maxBytesValue);
indexRecord += byteSize;
}
return rowsFound;
}
};
void generateRecord(
char *record,
const unsigned int attr_sizes[],
const int attr_max_value[],
int attr_count
) {
auto offset = 0;
for (int c = 0; c < attr_count; ++c) {
if (attr_max_value[c] == -1) {
for (int j = 0; j < attr_sizes[c]; ++j) {
record[offset + j] = rand() % 256;
}
} else {
for (int j = 0; j < attr_sizes[c]; ++j) {
record[offset + j] = rand() % attr_max_value[c];
}
}
offset += attr_sizes[c];
}
}
int main() {
TableSchema schema;
const int attribute_count = 13;
const int record_count = 1000000;
// for simplicity sake, attr_max_value > 0 is set only for attributes, which size is 1.
unsigned int attr_sizes[attribute_count] = {1, 5, 1, 5, 1, 1, 1, 6, 1, 1, 1, 11, 1};
int attr_max_values[attribute_count] = {3, -1, 4, -1, 6, 5, 7, -1, 7, 6, 5, -1, 8};
schema.attrs_count = attribute_count;
schema.attr_sizes = attr_sizes;
schema.attr_max_values = attr_max_values;
schema.calculate_offsets();
srand((unsigned ) time(nullptr));
BitmapIndex bitmapIndex(attr_max_values, attribute_count, record_count);
char *record = new char[schema.record_size];
for (int i = 0; i < record_count; ++i) {
// generate some random records and add them to the index
generateRecord(record, attr_sizes, attr_max_values, attribute_count);
bitmapIndex.addRecord(record, attr_sizes);
}
char query[attribute_count] = {-1, -1, 0, -1, -1, 3, 2, -1, 3, 3, 4, -1, 6};
// simulate Select COUNT(*) WHERE a1 = -1, a2 = -1, a3 = 0, ...
auto found = bitmapIndex.Select(query);
printf("Query found: %d records\n", found);
delete[] record;
return 0;
}
If the record size was 8, both GCC and Clang would autovectorize, for example: (hopefully a sufficiently representative stand-in for your actual context in which the code occurs)
The important part of it, as compiled by GCC, looks like this:
Not bad. It uses the same ideas that I would used manually:
vpand
the data with a mask (simplification of your bitwise logic), compare it to zero, subtract the results of the comparisons (subtract because a True result is indicated with -1) from 4 counters packed in a vector. The four separate counts are added after the loop.By the way, note that I made
rowsFound
anuint64_t
. That's important. IfrowsFound
is not 64-bit, then both Clang and GCC will try very hard to narrow the count ASAP, which is exactly the opposite of a good approach: that costs many more instructions in the loop, and has no benefit. If the count is intended to be a 32-bit int in the end, it can simply be narrowed after the loop, where it is probably not merely cheap but actually free to do that.Something equivalent to that code would not be difficult to write manually with SIMD intrinsics, that could make the code less brittle (it wouldn't be based on hoping that compilers will do the right thing), but it wouldn't work for non-x86 platforms anymore.
If the records are supposed to be 7-byte, that's a more annoying problem to deal with. GCC gives up, Clang actually goes ahead with its auto-vectorization, but it's not good: the 8-byte loads are all done individually, the results then put together in a vector, which is all a big waste of time.
When doing it manually with SIMD intrinsics, the main problems would be unpacking the 7-byte records into qword lanes. An SSE4.1 version could use
pshufb
(pshufb
is from SSSE3, butpcmpeqq
is from SSE4.1 so it makes sense to target SSE4.1) to do this, easy. An AVX2 version could do a load that starts 2 bytes before the first record that it's trying to load, such that the "split" between the two 128-bit halves of the 256-bit registers falls between two records. Thenvpshufb
, which cannot move bytes from one 128-bit half to the other, can still move the bytes into place because none of them need to cross into the other half.For example, an AVX2 version with manual vectorization and 7-byte records could look something like this. This requires either some padding at both the end and the start, or just skip the first record and end before hitting the last record and handle those separately. Not tested, but it would at least give you some idea of how code with manual vectorization would work.