Question

Here's some code that implements a small receiving server using conduit, network-conduit, and stm-conduit. It receives data on a socket and then streams it through an STM-channel to the main thread.

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class

import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)

import System.Directory (removeFile)
import System.IO

type BSChan = TBMChan ByteString

listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
    chan <- atomically $ newTBMChan bufSize
    forkListener chan
    return chan
  where
    forkListener chan = void . forkIO $ listen soc 2 >> loop where 
      loop = do
        (conn, _) <- accept soc
        sourceSocket conn $$ sinkTBMChan chan
        close conn
        loop

main :: IO ()
main = do
  soc <- socket AF_UNIX Stream 0
  bind soc (SockAddrUnix "mysock")
  socChan <- listenSocket soc 8
  sourceTBMChan socChan $$ DCB.sinkHandle stdout
  removeFile "mysock"

(In the real application, the stream of data from the socket gets merged with some others, which is why I don't handle it directly in the listener).

The problem is that, where I had expected this to stay open until the main thread is killed, instead it exits after the first message is received on the socket. I cannot work out why it does this, unless it's that the sink (on 2nd to last line) is exiting once it sees the end of the first stream of data. Can I persuade it not to do this? There's some stuff in Conduit about making a source resumable, but not a sink.

Was it helpful?

Solution

From the documention of sinkTBMChan:

When the sink is closed, the channel will close too.

So when the first socket handle closes, it causes the Source from sourceSocket to close, closing the connected sink which in turn closes the TBMChan which propagates to sinkHandle stopping the sink.

The simplest way to solve this is probably to change your loop into a custom source that doesn't close between connections and connect that source into the TBMChan.

listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
    chan <- atomically $ newTBMChan bufSize
    forkListener chan
    return chan
  where
    forkListener chan = void . forkIO $ do
      listen soc 2
      loop $$ sinkTBMChan chan

    loop = do
      (conn, _) <- liftIO $ accept soc
      sourceSocket conn
      liftIO $ close conn
      loop

OTHER TIPS

Coordinating shutdown of writers and readers from a channel is a non-trivial problem, but you can reuse a solution from the pipes ecosystem to solve this, which is to use the pipes-concurrency library. This library provides several pipes-independent utilities that you can reuse with conduit libraries for communicating between readers and writers so that each side automatically correctly knows when to clean up and you can manually clean up either side as well, too.

The key function that you use from the pipes-concurrency library is spawn. Its type is:

spawn :: Buffer a -> IO (Output a, Input a)

The Buffer specifies what underlying STM channel abstraction to use. Judging by your example code, it sounds like you want a Bounded buffer:

spawn (Bounded 8) :: IO (Output a, Input a)

The a can be anything in this case, so it can be a ByteString, for example:

spawn (Bounded 8) :: IO (Output ByteString, Input ByteString)

The Input and Output behave like a mailbox. You add messages to the mailbox by sending data to the Outputs and you take messages out of the mailbox (in FIFO order) by recving data from Inputs:

-- Returns `False` if the mailbox is sealed
send :: Output a -> a -> STM Bool

-- Returns `Nothing` if the mailbox is sealed
recv :: Input a -> STM (Maybe a)

The neat feature of pipes-concurrency is that it instruments the garbage collector to automatically seal the mailbox if there either no readers or no writers to the mailbox. This avoids a common source of deadlocks.

If you were using the pipes ecosystem you would normally use the following two higher-level utilities to read and write to mailbox.

-- Stream values into the mailbox until it is sealed
toOutput :: Output a -> Consumer a IO ()

-- Stream values from the mailbox until it is sealed
fromInput :: Input a -> Producer a IO ()

However, because the core machinery is pipes-independent you can rewrite equivalent conduit versions of these functions:

import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Pipes.Concurrent

toOutput' :: Output a -> Sink a IO ()
toOutput' o = awaitForever (\a -> lift $ atomically $ send o a)

fromInput' :: Input a -> Source IO a
fromInput' i = do
    ma <- lift $ atomically $ recv i
    case ma of
        Nothing -> return ()
        Just a  -> do
            yield a
            fromInput' i

Then your main function would look like something like this:

main :: IO ()
main = do
    soc <- socket AF_UNIX Stream 0
    bind soc (SockAddrUnix "mysock")
    (output, input) <- spawn (Bounded 8)
    forkIO $ readFromSocket soc $$ toOutput output
    fromInput input $$ DCB.sinkHandle stdout
  removeFile "mysock"

... where readFromSocket would be some Source that reads from your Socket.

You can then freely write to the output using other sources of data, too, and not worry about having to coordinate them or dispose of the input or output properly when you are done.

To learn more about pipes-concurrency, I recommend reading the official tutorial.

I think @shang's answer is the correct one, I'd just go a bit farther and say that the behavior of writeTBMChan looks like the better culprit here. I'd recommend changing it to not automatically close the TBMChan. A simple implementation of this idea is:

sinkTBMChan chan = awaitForever $ liftIO . atomically . writeTBMChan chan

If you use that in your program, it will work as expected.

So, here's one answer which doesn't involve creating a resumable sink. The sourceSocket in network-conduit allows a single connection, but we can implement the reconnect behaviour inside sourceSocket (apologies for the code, I think it needs cleaning, but at least it works!):

sourceSocket :: (MonadIO m) => Socket -> Producer m ByteString
sourceSocket sock =
    loop
  where
    loop = do
      (conn, _) <- lift . liftIO $ accept sock
      loop' conn
      lift . liftIO $ close conn
      loop
    loop' conn = do
      bs <- lift . liftIO $ recv conn 4096
      if B.null bs
        then return ()
        else yield bs >> loop' conn

One problem here is that this never exits (until the program dies). This isn't a problem in my use case, since the socket should remain listening for the life of the program.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top