How to call user-defined function in RcppParallel?

1.7k views Asked by At

Inspired by the artical http://gallery.rcpp.org/articles/parallel-distance-matrix/, I try to use RcppParallel to run brute-force search in high-dimensional parametric space for backtesting using multithreads. I am stuck in how to call a self-defined function in the struct part. The idea is like this:

First, create a parametric matrix NumericMatrix params_mat in R first, and use the backtesting data with List, NumericVector, CharacterVector datatype, such as List Data_1, NumericVector Data_2, CharacterVector Data_3, ..., which are static for each parametric scenario params_vec (Note that it is the row of params_mat).

Next, define the backtesting function that output a vector that consisting 3 key variables to evaluate the strategy performance.

Here is an example of my params_mat and Backtesting_Fun that can be run in R and Rcpp, respectively.

//[[Rcpp::export]]
NumericMatrix data_frame_rcpp(const Rcpp::List& list_params) 
{
  NumericMatrix res = list_params[0];
  return res;
}

# R codes to generate params_mat
params <- expand.grid (x_1=seq(1,100,1), x_2=seq(3,100,2), ..., x_n=seq(4,200,1));                           
list_params = list(ts(params));
tmp_params_data = data_frame_rcpp(list_params);                                              
params_mat = matrix(tmp_params_data, ncol = ncol(tmp_params_data), dimnames = NULL); 
params_vec = params_mat[ii,];

# User-defined Rcpp codes for backtesting
NumericVector Backtesting_Fun (List Data_1, NumericVector Data_2, CharacterVector Data_3, ..., NumericVector params_vec)
{
  // Main function parts to run backtesting for each params_vec scenario.
  ... etc

  // save 3 key result variables together with each params_vec (just a simple illustration).
  NumericVector res = NumericVector::create(params_vec[0],...,params_vec[size-1],
                                            key_1, key_2, key_3); 
  return res;
}

Certainly we need to rewrite/modify the original Rcpp Backtesting_Fun with RVector/RMatrix types, and then use the following RcppParallelcodes to call Backtesting_Fun in struct Backtest_parallel:

// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;

RVector<double> Backtesting_Fun (const RVector<double> Data_1, const RVector<double> Data_2, 
                                const RVector<string> Data_3,..., const RVector<double> params_vec)
{
   // Main function parts to run backtesting for each params_vec scenario.
   ... etc;

   // save 3 key result variables together with each params_vec
   ... etc; 

   return res;
}

struct Backtest_parallel : public Worker 
{       
   // input matrix to read from
   const RVector<List> Data_1;
   const RVector<double> Data_2;
   const RVector<string> Data_3;
   ...
   const RMatrix<double> params_mat;

   // output matrix to write to
   RMatrix<double> rmat;

   // initialize from Rcpp input and output matrixes (the RMatrix class
   // can be automatically converted to from the Rcpp matrix type)
   Backtest_parallel(const List Data_1, const NumericVector Data_2, 
   const CharacterVector Data_3, ..., const NumericMatrix params_mat)
      : Data_1(Data_1), Data_2(Data_2), Data_3(Data_3), ..., params_mat(params_mat) {}

   // function call operator that work for the specified range (begin/end)
   void operator()(std::size_t begin, std::size_t end) 
   {
      for (std::size_t ii = begin; ii < end; i++) 
      {
         // params rows that we will operate on
         RMatrix<double>::Row params_row = params_mat.row(ii);

         // Run the backtesting function defined above
         RVector<double> res = Backtesting_Fun(Data_1, Data_2, ..., params_row)
         for (std::size_t jj = 0; jj < res.length(); jj++) 
         {
            // write to output matrix
            rmat(ii,jj) = res[jj];
         }
      }
   }
};

// [[Rcpp::export]]
NumericMatrix rcpp_parallel_backtest(List Data_1, NumericVector Data_2, CharacterVector Data_3,
                                     ..., NumericMatrix params_mat) 
{      
   // allocate the matrix we will return
   NumericMatrix rmat(params_mat.nrow(), params_mat.nrow()+3);

   // create the worker
   Backtest_parallel backtest_parallel(Data_1, Date_2, ..., params_mat);

   // call it with parallelFor
   parallelFor(0, rmat.nrow(), backtest_parallel);

   return rmat;
}

Here are my questions:

  1. Can RVector contains List datatype, or is there any specific container in RcppParallel to contain List;

  2. In the Backtesting_Fun, the input should be RVector/RMatrix types, does that mean we really need to convert the orginal Rcpp main codes with NumericVector into RVector?

Or is there any better way to do parallel computing for my case in RcppParallel? Thanks in advance.

EDIT:

  1. I look at the other examples regarding RcppPararrel in http://gallery.rcpp.org/articles/parallel-matrix-transform/, http://gallery.rcpp.org/articles/parallel-inner-product/, the common idea in struct operator()is to use pointers to manipulate the data input for operator(), so is there any way to build a user defined function in my case with pointer inputs?

  2. If the above way doesn't work, is it feasible to use wrap to convert RVector/RMatrix back into Rcpp datatype, i.e., NumericVector.. in operator() so that the input types of user-defined function Backtesting_Fun can remain unchanged.

1

There are 1 answers

0
Alvin On

I think I may find an alternative way to solve this question: The keys are to use a thread safe accessors to contain variables within the struct, and remain RVector / RMatrix in the outside main function so that the parallelFor can work fine, which is the most important part in this parallel algo. Here are my ways:

  1. Get rid of List datatype: Instead, we can convert the List variable by using NumericVector / NumericMatrix container and record its corresponding index so that the subvector/submatrix will point the same elements as it was as a list's element.

  2. Convert RVector / RMatrix into arma::vec / arma::mat: As mentioned in RcppParallel Github, C++ Armadillo are thread-safe in struct's operator. Here, I modify the example given in Parallel Distance Matrix Calculation with RcppParallel by using this idea, which nearly remains the same test speed.

    struct JsDistance : public Worker
    {
      const RMatrix<double> tmp_MAT;  // input matrix to read from
      RMatrix<double> tmp_rmat;       // output matrix to write to
      std::size_t row_size, col_size;
    
      // Convert global input/output into RMatrix/RVector type
      JsDistance(const NumericMatrix& matrix_input, NumericMatrix& matrix_output, 
                 std::size_t row_size, std::size_t col_size)
        : tmp_MAT(matrix_input), tmp_rmat(matrix_output), row_size(row_size), col_size(col_size) {}
    
      // convert RVector/RMatrix into arma type for Rcpp function
      // and the follwing arma data will be shared in parallel computing
      arma::mat convert()
      {
        RMatrix<double> tmp_mat = tmp_MAT;
        arma::mat MAT(tmp_mat.begin(), row_size, col_size, false);
        return MAT;
      }
    
    
      void operator()(std::size_t begin, std::size_t end)
      {
        for (std::size_t i = begin; i < end; i++)
        {
          for (std::size_t j = 0; j < i; j++)
          {
            // rows we will operate on
            arma::mat MAT = convert();
            arma::rowvec row1 = MAT.row(i);          // get the row of arma matrix
            arma::rowvec row2 = MAT.row(j);
    
            // compute the average using std::tranform from the STL
            std::vector<double> avg(row1.n_elem);
            std::transform(row1.begin(), row1.end(), // input range 1
                           row2.begin(),             // input range 2
                           avg.begin(),              // output range
                           average);                 // function to apply
    
            // calculate divergences
            double d1 = kl_divergence(row1.begin(), row1.end(), avg.begin());
            double d2 = kl_divergence(row2.begin(), row2.end(), avg.begin());
    
            // write to output matrix
            tmp_rmat(i,j) = sqrt(.5 * (d1 + d2));
          }
        }
      }
    };
    
    // [[Rcpp::export]]
    NumericMatrix rcpp_parallel_js_distance_modify(const Rcpp::NumericMatrix& matrix_input, int N_cores)
    {
      // allocate the matrix we will return
      NumericMatrix matrix_output(matrix_input.nrow(), matrix_input.nrow());
      std::size_t row_size = matrix_input.nrow();
      std::size_t col_size = matrix_input.ncol();
    
      // create the worker
      JsDistance jsDistance(matrix_input, matrix_output, row_size, col_size);
    
      // call it with parallelFor
      parallelFor(0, matrix_input.nrow(), jsDistance, matrix_input.nrow()/N_cores);           // parallelFor with grain size setting
    
      return matrix_output;
     }
    
     // Example compare: 
     n_row = 1E3;
     n_col = 1E2;
     m = matrix(runif(n_row*n_col), nrow = n_row, ncol = n_col);
     m = m/rowSums(m);
    
     res <- benchmark(rcpp_parallel_js_distance(m, 6),
             rcpp_parallel_js_distance_orignal(m, 6),
             order="relative")
     res[,1:4];
    
     #test                                    #elapsed   #relative
     rcpp_parallel_js_distance_orignal(m, 6)  128.069    1.000
     rcpp_parallel_js_distance(m, 6)          129.210    1.009
    

As we can see, the datatype within operator will be C++ arma, and now we can safely and fastly call our user-defined function by directly using the object instead of pointers only, which may not be generic or easily designed.

Now, this parallelFor structure will share the same data source without extra copy in parallel computing, and then we can do some slightly change for backtesting by using the idea mentioned in the above question.