Coordinating shutdown of writers and readers from a channel is a non-trivial problem, but you can reuse a solution from the pipes
ecosystem to solve this, which is to use the pipes-concurrency
library. This library provides several pipes
-independent utilities that you can reuse with conduit
libraries for communicating between readers and writers so that each side automatically correctly knows when to clean up and you can manually clean up either side as well, too.
The key function that you use from the pipes-concurrency
library is spawn
. Its type is:
spawn :: Buffer a -> IO (Output a, Input a)
The Buffer
specifies what underlying STM channel abstraction to use. Judging by your example code, it sounds like you want a Bounded
buffer:
spawn (Bounded 8) :: IO (Output a, Input a)
The a
can be anything in this case, so it can be a ByteString
, for example:
spawn (Bounded 8) :: IO (Output ByteString, Input ByteString)
The Input
and Output
behave like a mailbox. You add messages to the mailbox by send
ing data to the Output
s and you take messages out of the mailbox (in FIFO order) by recv
ing data from Input
s:
-- Returns `False` if the mailbox is sealed
send :: Output a -> a -> STM Bool
-- Returns `Nothing` if the mailbox is sealed
recv :: Input a -> STM (Maybe a)
The neat feature of pipes-concurrency
is that it instruments the garbage collector to automatically seal the mailbox if there either no readers or no writers to the mailbox. This avoids a common source of deadlocks.
If you were using the pipes
ecosystem you would normally use the following two higher-level utilities to read and write to mailbox.
-- Stream values into the mailbox until it is sealed
toOutput :: Output a -> Consumer a IO ()
-- Stream values from the mailbox until it is sealed
fromInput :: Input a -> Producer a IO ()
However, because the core machinery is pipes
-independent you can rewrite equivalent conduit
versions of these functions:
import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Pipes.Concurrent
toOutput' :: Output a -> Sink a IO ()
toOutput' o = awaitForever (\a -> lift $ atomically $ send o a)
fromInput' :: Input a -> Source IO a
fromInput' i = do
ma <- lift $ atomically $ recv i
case ma of
Nothing -> return ()
Just a -> do
yield a
fromInput' i
Then your main function would look like something like this:
main :: IO ()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
(output, input) <- spawn (Bounded 8)
forkIO $ readFromSocket soc $$ toOutput output
fromInput input $$ DCB.sinkHandle stdout
removeFile "mysock"
... where readFromSocket
would be some Source
that reads from your Socket
.
You can then freely write to the output
using other sources of data, too, and not worry about having to coordinate them or dispose of the input
or output
properly when you are done.
To learn more about pipes-concurrency
, I recommend reading the official tutorial.