Using pooledMapConcurrentlyN in polysemy

209 views Asked by At

I'm currently playing around with Polysemy, rewriting a small toy project of mine to get used to it. I'm stumbling upon a piece of code that uses pooledMapConcurrentlyN, so basically a parallel version of traverse with bounded concurrency.

I can strip my example down to this:

foo :: Sem r Int
foo = do
  res <- pooledMapConcurrentlyN 3 action (["foo", "bar", "baz"] :: [String])
  pure $ sum res

action :: String -> Sem r Int 
action = pure. length

This doesn't compile because there's no instance for MonadUnliftIO (Sem r). It does compile when I use traverse, but I'm looking for a concurrent version. I'm not sure which way I should go now.

I see the following options:

  • Implement a MonadUnliftIO (Sem r) instance. I see that there were some discussions about adding/implementing such an instance in this GitHub issue. However, it's not clear to me whether it's a good idea to do so.
  • Using something other than pooledMapConcurrentlyN that gives me an equivalent behavior. I know that there's parTraverse from the par-dual package, but that would require a ParDual instance. The parallel package could make a solution possible as well, but I'm not familiar with that so I can't tell if it's possible.
  • Model the parallel traverse as an effect. I tried it, but I couldn't manage to get an implementation for the effect. The effect definition I tried looks like this:
data ParTraverse m a where
  TraverseP :: (Traversable t) => Int -> (a -> m b) -> t a -> ParTraverse m (t b)

I'm not really familiar yet with neither GADTs nor Polysemy, so it's possible that I'm missing something obvious here.


EDIT: As pointed out in the answer below, the most appropriate solution is to model this as an effect and handle the concurrency in the effect interpretation as opposed to the business logic. This means that I'm looking for a higher order effect (?) similar to the ParTraverse effect above:

data ParTraverse m a where
  TraverseP :: (Traversable t) => (a -> m b) -> t a -> ParTraverse m (t b)

makeSem ''ParTraverse

parTraverseToIO :: (Member (Embed IO) r) => Sem (ParTraverse ': r) a -> Sem r a
parTraverseToIO = interpretH $ \case
  TraverseP f ta -> do
    _something

I'm not sure whether this type signature is correct or not (should the action have type a -> Sem r b? The signature for traverse has an Applicative constraint on m, how would I model that?)

2

There are 2 answers

0
Torsten Schmits On BEST ANSWER

As for the ParTraverse implementation, this is what I replied over on github, for a version specialized to [] for t:

pooledMapConcurrently :: Member (Final IO) r => Int -> (a -> Sem r b) -> [a] -> Sem r [Maybe b]
pooledMapConcurrently num f ta =
  ...

data ParTraverse m a where
  TraverseP :: (a -> m b) -> [a] -> ParTraverse m [b]

makeSem ''ParTraverse

parTraverseToIO :: (Member (Final IO) r) => InterpreterFor ParTraverse r
parTraverseToIO =
  interpretH \case
   TraverseP f ta -> do
     taT <- traverse pureT ta
     fT <- bindT f
     tb <- raise (parTraverseToIO (pooledMapConcurrently 1 fT taT))
     ins <- getInspectorT
     pureT (catMaybes (inspect ins <$> catMaybes tb))

Some explanations for the combinators used inside interpretH, where we operate in the Tactical environment:

  • Since we're dealing with a function a -> m b, where m is instantiated to Sem rInitial inside the interpreter, we have to use bindT to get a function that is something like f a -> Sem r (f b), with f being the monadic state of the interpreters.
  • We can't run pooledMapConcurrently on the Sem rInitial directly, because Member (Final IO) is only given for r.
  • ta contains the input for f, but since we lifted that to expect f a, we also have to call pureT on each element of ta, using traverse since it is a monadic action.
  • Functions produced by bindT (and runT) produce Sems that still have the current effect, ParTraverse, at the head, because the effect has to be interpreted within the wrapped Sem (passed in as a -> m b). This even allows to use a different interpreter for the inner program. In our case, we simply run parTraverseToIO on the result of f again. After that, we have to lift this Sem back into the Tactical environment (which is just another effect at the head), so we use raise.
  • Since our lifted f produces f (Maybe b) as result, we need to unpack this in order to get the return type right. For that, we can use the inspector, which transforms f to Maybe, giving us Maybe (Maybe b), which we can then flatten into a list.

For completeness, here's the implementation of pooledMapConcurrently, written by KingoftheHomeless:

pooledMapConcurrently :: (Member (Final IO) r, Traversable t) => Int -> (a -> Sem r b) -> t a -> Sem r (t (Maybe b))
pooledMapConcurrently i f t = withWeavingToFinal $ \s wv ins ->
  (<$ s) <$> pooledMapConcurrentlyIO i (\a -> ins <$> wv (f a <$ s)) t
3
Georgi Lyubenov On

Then I would try:

  1. Don't do the concurrency as an effect in your business logic
  2. Use pooledMapconcurrentlyIO + embed in your interpreter

So you would have something like this

data GetThings m a where
  GetThings :: [InfoToFetchThing] -> GetThings m [Thing]

runGetThingsConcurrently :: Member (Embed IO) r => Sem (GetThings ': r) a -> Sem r a
runGetThingsConcurrently = interpret \case
  GetThings infos -> do
  ...
  embed $ pooledMapConcurrentlyIO 42 <fetch-action> infos

Ofcourse, you can also customise this a lot - have a Traversable instead of a list, pass in <fetch-action> as an argument to the interpreter, pass in how many threads you want as an argument to you interpreter, etc.

Edit: since the action to be executed is also required to be in Sem r, and not in IO, you can use withWeavingToFinal to (potentially) get IO out of Sem r, as described in the link.