rollapply for large data using sparklyr

1k views Asked by At

I want to estimate rolling value-at-risk for a dataset of about 22.5 million observations, thus I want to use sparklyr for fast computation. Here is what I did (using a sample database):

library(PerformanceAnalytics)
library(reshape2)
library(dplyr)

data(managers)
data <- zerofill(managers)
data<-as.data.frame(data)
class(data)
data$date=row.names(data)
lmanagers<-melt(data, id.vars=c('date'))

Now I estimate VaR using dplyr and PerformanceAnalytics packages:

library(zoo) # for rollapply()
var <- lmanagers %>% group_by(variable) %>% arrange(variable,date) %>% 
  mutate(var=rollapply(value, 10,FUN=function(x) VaR(x, p=.95, method="modified",align = "right"), partial=T))

This works fine. Now I do this to make use of sparklyr:

library(sparklyr)
sc <- spark_connect(master = "local")
lmanagers_sp <- copy_to(sc,lmanagers)
src_tbls(sc)

var_sp <- lmanagers_sp %>% group_by(variable) %>% arrange(variable,date) %>% 
  mutate(var=rollapply(value, 10,FUN=function(x) VaR(x, p=.95, method="modified",align = "right"), partial=T)) %>% 
  collect

But this gives the following error:

Error: Unknown input type: pairlist

Can anyone please tell me where is the error and what is the correct code? Or any other solution to estimate rolling VaR faster is also appreciates.

2

There are 2 answers

0
Javier Luraschi On

For custom dplyr backends like sparklyr, mutate does not currently support arbitrary R functions defined in other packages; therefore, rollapply() is currently unsupported.

In order to calculate value-at-risk in sparklyr, one approach is to extend sparklyr using Scala and R and follow an approach similar to: Estimating Financial Risk with Apache Spark.

0
C8H10N4O2 On

Let me break your question into two tasks:

  • how to do a rolling self-join (i.e., a.manager_id = b.manager_id and a.date < b.date and b.date <= a.date + 10) with sparklyr interface
  • how to use a custom function (i.e., VaR) with sparklyr interface

The first task might be possible with the dplyr verbs, which support a limited set of Window functions, including lead() and lag(). You'd probably end up with something really ugly, along the lines of (lag(return,1) + lag(return,2) + lag(return,3))/(3 - is.na(lag(return,1)) - is.na(lag(return,2)) - is.na(lag(return,3)) -- just a generic example. (Unfortunately conditional joins, e.g., date windows, are still unsupported in dplyr- this question seems to come up often, e.g. this one.)

It would be much easier just to write the first task in Direct Spark SQL (with conditional self-join described above) wrapped with DBI::dbGetQuery().

The second task is a statistical one that can't be done simply using dplyr or direct SQL, and it has a library dependency that sparklyr won't support, so you need to use a Scala (or Python) user-defined function (UDF) to calculate VaR, such as the one already linked in the other answer.

tl;dr First task is doable via sparklyr (but using SQL, not dplyr). Second task requires an outside UDF that you can then invoke() via sparklyr.