Run breakpoint (lm) detection in parallel in R

379 views Asked by At

I am doing about 80000 time series breakpoint detection calculations in R. I have all these extremely different time series where I cannot apply ARIMA models so I am calculating a linear model per time series, then extract the breakpoints and use the fitted results of the regression to calculate the trend coming from the last breakpoint.
Example of one time series

In the above example the algorithm would detect three breakpoints (one incline, one rather flat and one decline). It's a perfect fit for my needs but running 80k breakpoint calculations once a week sequentially is just too much overhead, hence I am trying to implement this by using parallel processing in R.

In this example (find link to data below) I am calculating the breakpoints sequentially which is taking about 24h for all 88k.

df.subset <- read.csv("dfsubset.csv)"
start <- Sys.time()

All.Breakpoints <- df.subset %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))

Sys.time() - start

In this code snippet I am running the detection on 10 time series (on my mac) which is taking 47 seconds. I would guess that parallelising should reduce this benchmark time to about 1/4 pf the time.

Below I have listed three ways I have tried to parallelise the calculation but I cannot get the nested apply to work in a parallel setting.

With the parallel package

clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(tidyr); library(magrittr)})

myfunction <- function(df.subset) {
All.Breakpoints <- df.subset %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
return(All.Breakpoints)
}

clusterExport(clus, "myfunction")

do.call(bind_rows, parApply(clus, df.subset, 1,{function(r) { 
myfunction(r[1]) }}))

With the multidplyr package:

library(multidplyr)
cluster <- create_cluster(4)
set_default_cluster(cluster)

four <- function(x) {
All.Breakpoints <- x %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
return(All.Breakpoints)
}

cluster_assign_value(cluster, 'four', four)
save <- df.subset %>% partition(CONC_ID) %>% map(four(.))

With the parallel package but other grouping

library(parallel)
cl <- detectCores()

group <- df.subset %>% group_by(CONC_ID) %>% group_indices
df.subset <- bind_cols(tibble(group), df.subset)

cluster <- create_cluster(cores = cl)

by_group <- df.subset %>%
partition(group, cluster = cluster)

by_group %>%
# Assign libraries
cluster_library("tidyr") %>%
cluster_library("dplyr") %>%
cluster_library("strucchange") %>%
cluster_library("purrr") %>%
# Assign values (use this to load functions or data to each core)
cluster_assign_value("df.subset", df.subset) 

cluster_eval(by_group, search())[[1]] # results for first cluster shown 
only
cluster_get(by_group, "df.subset")[[1]]

start <- proc.time() # Start clock
sp_500_processed_in_parallel <- by_group %>% # Use by_group party_df
group_by(CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .))) 
%>%
collect() %>% # Special collect() function to recombine partitions
as_tibble()   # Convert to tibble
time_elapsed_parallel <- proc.time() - start # End clock
time_elapsed_parallel

Link to file:

http://www.filedropper.com/dfsubset

I appreciate your ideas and feedback!

1

There are 1 answers

0
Jonathan On BEST ANSWER

Asking a question and describing the problem will solve it for yourself most of the time... I figured out that mutate does not work anywhere (keep me honest, Stackoverflow) in parallel in R.

I therefore changed to using do and distribute the load via multidplyr and get a decrease in compute time of about 50% when going from 1 to 4 cores and to 25% of total time when going from 1 to 8 cores.

Code below.

## parallel
cl <- detectCores()
cl

df.cluster <- df.subset

cluster <- create_cluster(cores = cl)
cluster

by_group <- df.cluster %>%
partition(CONC_ID, cluster = cluster)
by_group

by_group %>%

# Assign libraries
cluster_library("strucchange")
cluster_eval(by_group, search())[[1]] # results for first cluster shown only

start <- proc.time() # Start clock

cluster.processed <- by_group %>%
                     do(model = breakpoints(ACT_QTY_new ~ Index, data = .)) %>%
                     collect()

time_elapsed_parallel <- proc.time() - start # End clock
time_elapsed_parallel

rm(by_grou)
gc()

Predictions <- cluster.processed %>%
mutate(SegmentedForecast = map(model, fitted))
df.fitted.vector <- as.data.frame(rowwise(Predictions[,3])) .