Replacement for parallel plyr with doMC

627 views Asked by At

Consider a standard grouped operation on a data.frame:

library(plyr)
library(doMC)
library(MASS) # for example

nc <- 12
registerDoMC(nc)

d <- data.frame(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"

res <- ddply(d, .(g), function(d_group) {
   # slow, complicated operations on d_group
}, .parallel = FALSE)

It's trivial to take advantage of a multi-core setup by simply writing .parallel = TRUE instead. This is one of my favorite features of plyr.

But with plyr being deprecated (I think) and essentially replaced by dplyr, purrr, etc., the solution to parallel processing has become significantly more verbose:

library(dplyr)
library(multidplyr)
library(parallel)
library(MASS) # for example

nc <- 12

d <- tibble(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"

cl <- create_cluster(nc)
set_default_cluster(cl)
cluster_library(cl, packages = c("MASS"))
cluster_copy(cl, obj = y)

d_parts <- d %>% partition(g, cluster = cl)
res <- d_parts %>% collect() %>% ungroup()

rm(d_parts)
rm(cl)

You can imagine how long this example could get considering each package and object you need inside the loop needs its own cluster_* command to copy it onto the nodes. The non-parallelized plyr-to-dplyr translation is just a simple dplyr::group_by construction and it's unfortunate that there's no terse way to enable parallel processing on it. So, my questions are:

  • Is this actually the preferred way to translate my code from plyr to dplyr?
  • What sort of magic is happening behind the scenes in plyr that makes it so easy to turn on parallel processing? Is there a reason this capability would be particularly difficult to add to dplyr and that's why it doesn't exist yet?
  • Are my two examples fundamentally different in terms of how the code is executed?
1

There are 1 answers

1
russellpierce On BEST ANSWER
  1. I don't think there is one true 'prefered' way to translate {plyr} code to {dplyr}.

  2. In the comments @Aurèle did a better job than I ever could in describing the connection between {plyr} and {doMC}. One thing that happened is that the incentives changed a bit. {doMC} is from Revolution Analytics (since purchased by Microsoft). But Hadley, who developed dplyr, currently works at RStudio. These two companies compete in the IDE space. So, it is perhaps natural that their packages aren't designed to play well together. The only form of parallelism I've seen strong support for coming out of RStudio is {sparklyr}, which they've made relatively 'easy' to set up. But, I can't really recommend futzing with Spark to do parallel processing for a single machine.

  3. @Aurèle again did a good job of explaining the execution differences. Your new code uses a PSOCK cluster and the old code used forks. Forks use a copy on write mode for accessing RAM, so parallel processes can start off with access to the same data immediately post fork. PSOCK clusters are like spawning new copies of R - they have to load libraries and receive an explicit copy of the data.

You can use a pattern like...

library(dplyr)
library(purrr)
library(future)
plan(multicore)
options(mc.cores = availableCores())
d <- data.frame(x = 1:8, g = c("group1", "group2", "group3", "group4"))
y <- "some global object"


split(d, d$g) %>% 
  map(~ future({Sys.sleep(5);mean(.x$x)})) %>% 
  map_df(~value(.x))

... with some finesse on the map_df step to do some parallel processing. Note that under {purrr} the ~ is anonymous function syntax where .x is the values that have been mapped in.

If you like to live dangerously, you might be able to create a version of something similar without using {future} by using a private method in {purrr}

mcmap <- function(.x, .f, ...) {
  .f <- as_mapper(.f, ...)
  mclapply(.x, function(.x) {
    force(.f)
    .Call(purrr:::map_impl, environment(), ".x", ".f", "list")
  }) %>%
    map(~ .x[[1]])
}