Eigen inverse sparse matrix took abnormal long time

177 views Asked by At

After the assembly of the sparse matrix mat, I used one of the sparse solvers (https://eigen.tuxfamily.org/dox/group__TopicSparseSystems.html) to inverse it. The following code took about 2s to inverse a 500,000*500,000 sparse matrix:

    Eigen::SparseMatrix<double> I(n,n);
    I.setIdentity();
    Eigen::SimplicialLLT<Eigen::SparseMatrix<double> > solver;
    solver.compute(mat);
    if (solver.info() != Eigen::Success){
        cout<<"not invertible"<<endl;
    }
    else{
        cout<<"invertible "<<endl;
        solver.solve(I);
    }

But the following took more than 3 hours

    Eigen::SparseMatrix<double> I(n,n);
    I.setIdentity();
    Eigen::SimplicialLLT<Eigen::SparseMatrix<double> > solver;
    solver.compute(mat);
    if (solver.info() != Eigen::Success){
        cout<<"not invertible"<<endl;
    }
    else{
        cout<<"invertible "<<endl;
        Eigen::SparseMatrix<double> invMat;
        invMat.reserve(250000 * round(250000/10));
        invMat=solver.solve(I);
     }

It was fast to run line invMat.reserve(250000 * round(250000/10)); But it took forever to run line invMat=solver.solve(I);

Are there any ways to reduce the speed?

I also tried to reserve large memory: invMat.reserve(n*n); //n is the same size of the matrix to be inverted. But it still ran for ever.

2

There are 2 answers

7
chtz On

In your first example, only the decomposition is computed. solver.solve(I) just returns a proxy object -- the actual inversion happens when you assign it to something.

You should (almost) never explicitly need to calculate the inverse of any matrix -- especially inverting a sparse matrix will in general result in a mostly dense matrix (this depends on the structure of the input matrix of course).

3
Homer512 On

To expand on my proposal to treat the block-diagonal matrix as a set of smaller, dense matrices, here is a reasonable implementation:

We use a single vector to store all blocks. I use an std::vector instead of an Eigen::Vector to allow dynamic growth while building the matrix. Of course this could also be achieved with Eigen::Vector and manual exponential growth. This could be a bit faster since it avoids std::vector's zero initialization.

Offsets into that vector and block sizes are kept as separate vectors. These could be combined into one vector of structs. However, there is a chance that you want to increase the integer size for the offsets to Eigen::Index while keeping the block sizes as int in which case a common struct would waste space for padding.

#include <Eigen/Dense>
#include <vector>

template<class Scalar, int Options=Eigen::ColMajor>
class BlockDiagonal
{
public:
    using value_type = Scalar;
    enum { flags = Options };
    using StorageVector = std::vector<Scalar>;
    using BlockMatrix = Eigen::Matrix<
            Scalar, Eigen::Dynamic, Eigen::Dynamic, Options>;
    using BlockMap = Eigen::Map<BlockMatrix>;
    using ConstBlockMap = Eigen::Map<const BlockMatrix>;
private:
    /**
     * Flattened storage of all blocks
     *
     * All BlockMaps point into this array
     */
    StorageVector storage_;
    /**
     * Index of first row / column per block
     *
     * For N blocks, contains N + 1 entries so that the size of the last block
     * can be computed
     */
    std::vector<int> blockstarts_;
    /**
     * For each block, the index in storage_ where its data lies
     */
    std::vector<int> blockoffsets_;
public:
    BlockDiagonal() noexcept
    : blockstarts_({0})
    {}
    int rows() const noexcept
    { return blockstarts_.back(); }

    int cols() const noexcept
    { return rows(); }

    Eigen::Index blocks() const noexcept
    { return static_cast<Eigen::Index>(blockoffsets_.size()); }

    int nonZeros() const noexcept
    { return static_cast<int>(storage_.size()); }

    const Scalar* data() const noexcept
    { return storage_.data(); }

    Scalar* data() noexcept
    { return storage_.data(); }

    int blockStart(Eigen::Index i) const noexcept
    {
        assert(i < blocks());
        return blockstarts_[i];
    }

Blocks can then simply be mapped on demand with minimal overhead.

    BlockMap operator[](Eigen::Index i) noexcept
    {
        assert(i < blocks());
        const int size = blockstarts_[i + 1] - blockstarts_[i];
        const int offset = blockoffsets_[i];
        return BlockMap(storage_.data() + offset, size, size);
    }
    ConstBlockMap operator[](Eigen::Index i) const noexcept
    {
        assert(i < blocks());
        const int size = blockstarts_[i + 1] - blockstarts_[i];
        const int offset = blockoffsets_[i];
        return ConstBlockMap(storage_.data() + offset, size, size);
    }

And building the matrix from dense block expressions can be done from the top left to the bottom right.

    template<class Derived>
    void addBlock(const Eigen::MatrixBase<Derived>& block)
    {
        const int rows = static_cast<int>(block.rows());
        assert(rows == block.cols() && "blocks need to be square");
        std::size_t end = storage_.size();
        storage_.resize(end + static_cast<unsigned>(rows * rows));
        BlockMap(storage_.data() + end, rows, rows) = block;
        blockstarts_.push_back(blockstarts_.back() + rows);
        blockoffsets_.push_back(static_cast<int>(end));
    }

Operations such as matrix multiplication or solving of linear systems can be built based on the dense operations per block. This also allows parallelization. However, this might need some tuning depending on block sizes and whether the sizes vary wildly. For example it might be better to use the internal parallelization for certain multiplication patterns.

    template<class Derived>
    typename Eigen::MatrixBase<Derived>::PlainObject
    operator*(const Eigen::MatrixBase<Derived>& right) const
    {
        assert(right.rows() == cols());
        using RtrnType = typename Eigen::MatrixBase<Derived>::PlainObject;
        RtrnType rtrn(rows(), right.cols());
        const Eigen::Index n = blocks();
#       pragma omp parallel for schedule(dynamic)
        for(Eigen::Index i = 0; i < n; ++i) {
            int start = blockStart(i);
            ConstBlockMap block = (*this)[i];
            Eigen::Index size = block.rows();
            rtrn.middleRows(start, size).noalias() =
                    block * right.middleRows(start, size);
        }
        return rtrn;
    }
    template<class Derived>
    typename Eigen::MatrixBase<Derived>::PlainObject
    solveLU(const Eigen::MatrixBase<Derived>& b) const
    {
        assert(b.rows() == rows());
        using RtrnType = typename Eigen::MatrixBase<Derived>::PlainObject;
        RtrnType rtrn(b.rows(), b.cols());
        const Eigen::Index n = blocks();
#       pragma omp parallel
        {
            Eigen::PartialPivLU<BlockMatrix> lu;
#           pragma omp for nowait schedule(dynamic)
            for(Eigen::Index i = 0; i < n; ++i) {
                int start = blockStart(i);
                ConstBlockMap block = (*this)[i];
                Eigen::Index size = block.rows();
                lu.compute(block);
                rtrn.middleRows(start, size) =
                        lu.solve(b.middleRows(start, size));
            }
        }
        return rtrn;
    }
};

Since you wanted to compute the inverse. Well, for starters, this can now be done straightforward block-by-block. Alternatively, we can use the inplace matrix decompositions to achieve the same result with hopefully somewhat better numeric properties.

The implementation is a bit awkward because we need to keep an array of solver objects that don't have default constructors or copy-assignment operators. So we have to do manual memory management with placement-new.

#include <cstdlib>
// using malloc, free

template<class BlockDiagonalType>
class InplaceLU
{
public:
    using BlockDiagonal = BlockDiagonalType;
    using BlockType = typename BlockDiagonal::BlockMatrix;
private:
    /** LU decomposition type that works inplace */
    using LUType = Eigen::PartialPivLU<Eigen::Ref<BlockType>>;
    BlockDiagonal* const parent_;
    LUType* const blocks_;
public:
    InplaceLU(BlockDiagonal& matrix)
    : parent_(&matrix),
      blocks_(static_cast<LUType*>(std::malloc(
            sizeof(LUType) * matrix.blocks())))
    {
        const Eigen::Index n = matrix.blocks();
#       pragma omp parallel for schedule(dynamic)
        for(Eigen::Index i = 0; i < n; ++i) {
            Eigen::Ref<BlockType> block(matrix[i]);
            new (blocks_ + i) LUType(block);
        }
    }
    InplaceLU(const InplaceLU&) = delete;
    InplaceLU& operator=(const InplaceLU&) = delete;
    ~InplaceLU()
    {
        const Eigen::Index n = parent_->blocks();
        for(Eigen::Index i = n; i; --i)
            blocks_[i - 1].~LUType();
        std::free(blocks_);
    }
    template<class Derived>
    typename Eigen::MatrixBase<Derived>::PlainObject
    solve(const Eigen::MatrixBase<Derived>& b) const
    {
        assert(b.rows() == parent_->cols());
        using RtrnType = typename Eigen::MatrixBase<Derived>::PlainObject;
        RtrnType rtrn(b.rows(), b.cols());
        const Eigen::Index n = parent_->blocks();
#       pragma omp parallel for schedule(dynamic)
        for(Eigen::Index i = 0; i < n; ++i) {
            const int start = parent_->blockStart(i);
            LUType& lu = blocks_[i];
            Eigen::Index size = lu.rows();
            rtrn.middleRows(start, size) =
                    lu.solve(b.middleRows(start, size));
        }
        return rtrn;
    }
};

If we want a regular sparse matrix from our block-diagonal, we can adapt it with Eigen's Map specialization for sparse matrices. We need two auxiliary vectors which would normally be part of the SparseMatrix object itself.

#include <Eigen/Sparse>

template<class BlockDiagonalType>
class SparseAdapter
{
public:
    using BlockDiagonal = BlockDiagonalType;
    using BlockType = typename BlockDiagonal::BlockMatrix;
    using value_type = typename BlockDiagonal::value_type;
    using SparseMatrix = Eigen::SparseMatrix<value_type, BlockDiagonal::flags>;
    using SparseMap = Eigen::Map<SparseMatrix>;
private:
    /** nonzero indices along inner dimension. One per scalar */
    Eigen::VectorXi innerIndices_;
    /** start offset along outer dimension + 1 for end */
    Eigen::VectorXi outerIndices_;
    SparseMap mapping_;

    static Eigen::VectorXi makeInner(BlockDiagonal& matrix)
    {
        Eigen::VectorXi rtrn(matrix.nonZeros());
        int* out = rtrn.data();
        for(Eigen::Index i = 0, n = matrix.blocks(); i < n; ++i) {
            const auto block = matrix[i];
            assert(block.rows() == block.cols());
            const int start = matrix.blockStart(i);
            for(int j = 0, m = block.cols(); j < m; ++j)
                for(int k = 0; k < m; ++k)
                    *(out++) = start + k;
        }
        return rtrn;
    }
    static Eigen::VectorXi makeOuter(BlockDiagonal& matrix)
    {
        assert(matrix.rows() == matrix.cols());
        Eigen::VectorXi rtrn(matrix.cols() + 1);
        int* out = rtrn.data();
        int offset = 0;
        for(Eigen::Index i = 0, n = matrix.blocks(); i < n; ++i) {
            const auto block = matrix[i];
            assert(block.rows() == block.cols());
            for(int j = 0, m = block.cols(); j < m; ++j) {
                *(out++) = offset;
                offset += m;
            }
        }
        *out = offset;
        return rtrn;
    }
public:
    explicit SparseAdapter(BlockDiagonal& matrix)
    : innerIndices_(makeInner(matrix)),
      outerIndices_(makeOuter(matrix)),
      mapping_(matrix.rows(), matrix.cols(), innerIndices_.size(),
               outerIndices_.data(), innerIndices_.data(), matrix.data())
    {}
    SparseMap& view() noexcept
    { return mapping_; }

    const SparseMap& view() const noexcept
    { return mapping_; }
};

Use it like this:

#include <random>
#include <iostream>

int main()
{
    int size = 10;
    std::default_random_engine rng;
    std::uniform_int_distribution<int> distr(2, 4);
    BlockDiagonal<double> matrix;
    for(int offset = 0, blocksize; offset < size; offset += blocksize) {
        blocksize = std::min(size - offset, distr(rng));
        Eigen::MatrixXd block = Eigen::MatrixXd::Random(blocksize, blocksize);
        std::cout << "Block\n" << block << "\n\n";
        matrix.addBlock(block);
    }
    SparseAdapter<BlockDiagonal<double>> sparse(matrix);
    std::cout << "Sparse\n" << sparse.view() << "\n\n";
    Eigen::MatrixX3d dense = Eigen::MatrixX3d::Random(size, 3);
    std::cout << "Product\n" << (matrix * dense) << "\n\n";
    std::cout << "Sparse Product\n" << (sparse.view() * dense) << "\n\n";
    std::cout << "Solution\n" << matrix.solveLU(dense) << "\n\n";
    Eigen::SparseLU<Eigen::SparseMatrix<double>> solver(sparse.view());
    std::cout << "Sparse solution\n" << solver.solve(dense) << "\n\n";
    InplaceLU<BlockDiagonal<double>> inplace(matrix);
    std::cout << "inplace solution\n" << inplace.solve(dense) << "\n\n";
}

In my tests this improved performance significantly with 100,000 rows/cols and block sizes in the range [2, 1000].