Finalization in Pipes-2.1.0 package
-
20-06-2021 - |
Question
I am using the Pipes-2.1.0 package and the zeromq3-haskell package to construct a little message pipeline. Everything seems to be going well except that I am having trouble understanding finalization of Frames.
In the following Frame I acquire two resources; a zeromq context, and a zeromq socket. Then I continuously wait for messages (in the form of ByteStrings
) to publish on the zeromq socket.
{-# LANGUAGE RebindableSyntax #-}
{-# LANGUAGE ScopedTypeVariables #-}
module PipesZeroMQ where
import Control.Frame
import Control.IMonad.Do
import Control.IMonad.Trans
import qualified Control.Monad as M
import Data.ByteString (ByteString)
import Data.String
import Prelude hiding (Monad(..))
import qualified System.ZMQ3 as ZMQ
type Address = String
fromList :: (M.Monad m) => [b] -> Frame b m (M a) (M a) ()
fromList xs = mapMR_ yield xs
publisher :: Address -> Frame Void IO (M ByteString) C ()
publisher addr = do
c <- liftU $ ZMQ.init 1
s <-liftU $ ZMQ.socket c ZMQ.Pub
liftU $ ZMQ.bind s addr
liftU $ print "Socket open for business!!!"
foreverR $ do
bs <- await
finallyF (ZMQ.close s M.>> ZMQ.term c M.>> print "ZMQ socket closed") $ do
(liftU $ ZMQ.send s [] bs)
(liftU (print "Sending message"))
Now if I try this:
λ> runFrame $ (publisher localAddress) <-< (fromList ["This", "that", "that"] >> close)
I get this:
"Socket open for business"
"Sending message"
"ZMQ socket closed"
*** Exception: ZMQError { errno = 88, source = "send", message = "Socket operation on non-socket" }
publisher
finalizes after receiving but one BytesString
.
Why is this happening?
What am I misunderstanding about finalization using Frames in Pipes-2.1.0?
Does the tree outside stand a chance if I start attacking it?
Solution
You made a mistake when writing the publisher
function:
foreverR $ do
bs <- await
finallyF (ZMQ.close s M.>> ZMQ.term c M.>> print "ZMQ socket closed") $ do
(liftU $ ZMQ.send s [] bs)
(liftU (print "Sending message"))
You probably wanted to place the finallyF
OUTSIDE the foreverR
loop:
finallyF (...) $ foreverR $ do
bs <- await
liftU $ ZMQ.send s [] bs)
liftU (print "Sending message")
The way you wrote it, it finalizes after each send, so it's doing exactly what you told it to do: finalize after every send. finallyF
calls the finalizer once the action it wraps is complete, both if it terminates successfully or unsuccessfully. You could also use catchF
in that case, since the loop never terminates anyway:
catchF (...) $ foreverR $ do
bs <- await
liftU $ ZMQ.send s [] bs)
liftU (print "Sending message")
Alternatively, you could have kept it inside the loop but switched to catchF
so that the finalizer doesn't get run after each send:
foreverR $ do
bs <- await
catchF (ZMQ.close s M.>> ZMQ.term c M.>> print "ZMQ socket closed") $ do
(liftU $ ZMQ.send s [] bs)
(liftU (print "Sending message"))
Also, if you are planning on writing a zeroMQ library based on pipes, keep in touch with me because I'm planning to return frames back to an ordinary monad in the next release with a lot of new enhancements to functionality, too, such as the ability to close and reinitialize resources. To reach me use my gmail.com address with username Gabriel439.