Question

I am using the Pipes-2.1.0 package and the zeromq3-haskell package to construct a little message pipeline. Everything seems to be going well except that I am having trouble understanding finalization of Frames.

In the following Frame I acquire two resources; a zeromq context, and a zeromq socket. Then I continuously wait for messages (in the form of ByteStrings) to publish on the zeromq socket.

{-# LANGUAGE RebindableSyntax    #-}
{-# LANGUAGE ScopedTypeVariables #-}

module PipesZeroMQ where

import           Control.Frame
import           Control.IMonad.Do
import           Control.IMonad.Trans
import qualified Control.Monad          as M
import           Data.ByteString        (ByteString)
import           Data.String
import           Prelude                hiding (Monad(..))
import qualified System.ZMQ3            as ZMQ

type Address = String

fromList :: (M.Monad m) => [b] -> Frame b m (M a) (M a) ()
fromList xs = mapMR_ yield xs

publisher :: Address -> Frame Void IO (M ByteString) C ()
publisher addr = do
  c  <- liftU $ ZMQ.init 1
  s  <-liftU $ ZMQ.socket c ZMQ.Pub
  liftU $ ZMQ.bind s addr   
  liftU $ print "Socket open for business!!!"

  foreverR $ do
    bs <- await
    finallyF (ZMQ.close s M.>> ZMQ.term c M.>> print "ZMQ socket closed") $ do
         (liftU $ ZMQ.send s [] bs)
         (liftU (print "Sending message"))

Now if I try this:

λ> runFrame $ (publisher localAddress) <-< (fromList ["This", "that", "that"] >> close)

I get this:

"Socket open for business"
"Sending message"
"ZMQ socket closed"
*** Exception: ZMQError { errno = 88, source = "send", message = "Socket operation on non-socket" }

publisher finalizes after receiving but one BytesString.

Why is this happening?

What am I misunderstanding about finalization using Frames in Pipes-2.1.0?

Does the tree outside stand a chance if I start attacking it?

Was it helpful?

Solution

You made a mistake when writing the publisher function:

foreverR $ do
    bs <- await
    finallyF (ZMQ.close s M.>> ZMQ.term c M.>> print "ZMQ socket closed") $ do
         (liftU $ ZMQ.send s [] bs)
         (liftU (print "Sending message"))

You probably wanted to place the finallyF OUTSIDE the foreverR loop:

finallyF (...) $ foreverR $ do
    bs <- await
    liftU $ ZMQ.send s [] bs)
    liftU (print "Sending message")

The way you wrote it, it finalizes after each send, so it's doing exactly what you told it to do: finalize after every send. finallyF calls the finalizer once the action it wraps is complete, both if it terminates successfully or unsuccessfully. You could also use catchF in that case, since the loop never terminates anyway:

 catchF (...) $ foreverR $ do
    bs <- await
    liftU $ ZMQ.send s [] bs)
    liftU (print "Sending message")

Alternatively, you could have kept it inside the loop but switched to catchF so that the finalizer doesn't get run after each send:

foreverR $ do
    bs <- await
    catchF (ZMQ.close s M.>> ZMQ.term c M.>> print "ZMQ socket closed") $ do
         (liftU $ ZMQ.send s [] bs)
         (liftU (print "Sending message"))

Also, if you are planning on writing a zeroMQ library based on pipes, keep in touch with me because I'm planning to return frames back to an ordinary monad in the next release with a lot of new enhancements to functionality, too, such as the ability to close and reinitialize resources. To reach me use my gmail.com address with username Gabriel439.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top