Question

I am working with the Haskell pipes package.

I am trying to use pipes-concurrency to merge a list of Producers together.

What I want to arrive at is:

merge :: MonadIO m => [Producer a m ()] -> Producer a m ()

so given a producer s1 and another producer s2: r = merge [s1, s2] which would give the behaviour:

s1 --1--1--1--|
s2 ---2---2---2|
r  --12-1-21--2|

Following the code in the tutorial page I came up with:

mergeIO :: [Producer a IO ()] -> Producer a IO ()
mergeIO producers = do
    (output, input) <- liftIO $ spawn Unbounded
    _ <- liftIO $ mapM (fork output) producers
    fromInput input
  where
    fork :: Output a -> Producer a IO () -> IO ()
    fork output producer = void $ forkIO $ do runEffect $ producer >-> toOutput output
                                              performGC

which works as expected.

However I am having difficulty generalizing things.

My attempt:

merge :: (MonadIO m) => [Producer a m ()] -> Producer a m ()
merge producers = do
    (output, input) <- liftIO $ spawn Unbounded
    _ <- liftIO $ mapM (fork output) producers
    fromInput input
  where
    runEffectIO :: Monad m => Effect m r -> IO (m r)
    runEffectIO e = do
        x <- evaluate $ runEffect e
        return x
    fork output producer = forkIO $ do runEffectIO $ producer >-> toOutput output
                                       performGC

Unfortunately this compiles but does not do all too much else. I am guessing that I am making a mess of runEffectIO. Other approaches to my current runEffectIO have yielded no better results.

The program:

main = do
    let producer = merge [repeater 1 (100 * 1000), repeater 2 (150 * 1000)]
    _ <- runEffect $ producer >-> taker 20
  where repeater :: Int -> Int -> Producer Int IO r
        repeater val delay = forever $ do
            lift $ threadDelay delay
            yield val
        taker :: Int -> Consumer Int IO ()
        taker 0 = return ()
        taker n = do
            val <- await
            liftIO $ putStrLn $ "Taker " ++ show n ++ ": " ++ show val
            taker $ n - 1

hits val <- await but does not get to liftIO $ putStrLn thus it produces no output. However it exits fine without hanging.

When I substitute in mergeIO for merge then the program runs I would expect outputting 20 lines.

Was it helpful?

Solution

While MonadIO is not sufficient for this operation, MonadBaseControl (from monad-control) is designed to allow embedding arbitrary transformer stacks inside the base monad. The companion package lifted-base provides a version of fork which will work for transformer stacks. I've put together an example of using it to solve your problem in the following Gist, though the main magic is:

import qualified Control.Concurrent.Lifted as L
fork :: (MonadBaseControl IO m, MonadIO m) => Output a -> Producer a m () -> m ThreadId
fork output producer = L.fork $ do
    runEffect $ producer >-> toOutput output
    liftIO performGC

Note that you should understand what happens to monadic states when treated this way: modifications to any mutable state performed in the child threads will be isolated to just those child threads. In other words, if you were using a StateT, each child thread would start off with the same state value that was in context when it was forked, but then you would have many different states that do not update each other.

There's an appendix in the Yesod book on monad-control, though frankly it's a bit dated. I'm just not aware of any more recent tutorials.

OTHER TIPS

The problem seems to be your use of evaluate, which I assume it is the evaluate from Control.Exception.

You seem to be using it to "convert" a value inside the generic monad m into IO, but it doesn't really work that way. You are just obtaining the m value out of the Effect and then returning it inside IO without actually executing it. The following code doesn't print "foo":

evaluate (putStrLn "foo") >> return ""

Maybe your merge function could take as an additional parameter a function m a -> IO a so that merge knows how to bring the result of runEffect into IO.

Unfortunately, you can't fork a Producer with a MonadIO base monad (or any MonadIO computation for that matter). You need to specifically include the logic necessary to run all other monad transformers to get back an IO action before you can fork the computation.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top