Question

I have a main thread that writes to a Map and a PSQ. In both the Map and the PSQ I use the same keys so that by looking at the PSQ the entry with min priority can be found with O(1) complexity and be mapped to a value in the Map.

Now, whilst my main thread adds/modifies both the Map and the PSQ when required, I have a second thread that constantly (forever $ do) looks at the PSQ to determine when the oldest key is N ms ago and then is supposed to flush it.

For this to happen both threads need to look at the same mutable data. What is the best way here to maintain state? Would this be a case for IOREfs? What else would be possible ways to solve this?

"Some" pre-alpha code here:

import Data.Time
import Data.Functor
import Data.Time.Clock.POSIX
import qualified Data.PSQueue as PSQ
import qualified Data.Map as Map
import Data.Maybe
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as B 

--PSQ = (host, PID) POSIXTime
--where the tuple is k and POSIXTime is p

--Map is (host, PortNumber) [messages]
--where the tuple is the key and [messages] is a list of messages

key = ("192.168.1.1", 4711)
messages = ["aaa", "bbbb", "ccccc"]

newRq :: IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
newRq = do
      time <- getPOSIXTime
      let q = PSQ.singleton key time
      let m = Map.singleton key messages
      return (q, m)

appendMsg :: String -> (String, Integer) -> Map.Map (String, Integer) [String] -> Map.Map (String, Integer) [String]
appendMsg newmsgs (host, port) m =
      let Just messages' = Map.lookup (host,port) m
          l = length . concat $ messages'
          l' = l + length newmsgs
      in 
      if l' < 1400 then Map.adjust (++ [newmsgs]) (host, port) m else m

insertNewRec :: (String, Integer) -> [String] -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
insertNewRec (a,b) c q m = do
      time <- getPOSIXTime
      let q1 = PSQ.insert (a,b) time q
      let m1 = Map.insert (a,b) c m
      return (q1, m1)

sendq :: Socket -> B.ByteString -> String -> PortNumber -> IO ()
sendq s datastring host port = do
      hostAddr <- inet_addr host
      sendAllTo s datastring (SockAddrInet port hostAddr)
      return ()

deleteRec :: (String, Integer) -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
deleteRec (host, port) q m = (q', m')
      where 
         m' = Map.delete (host, port) m
         q' = PSQ.delete (host, port) q

loopMyQ q m1 done = forever $ do 
      let Just m = PSQ.findMin q
      let time = (PSQ.prio m) + 0.200 --adds 200ms
      now <- getPOSIXTime
      if now < time
        then print (m1) 
        --here eventually I would call the send function to flush the queue
        else putMVar done ()

sendrecv :: Socket -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> String -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String])) 
sendrecv s q1 m1 msg = do
     let m2 = appendMsg msg key m1
         (q3, m3) = case m2 of   
                   val | m2 == m1 -> deleteRec key q1 m1
                       | otherwise -> (q1, m2)
     (q5, m5) <- if (m2 == m1) then (do (q4, m4) <- insertNewRec key (words msg) q3 m3
                                        return (q4, m4)) else return (q1, m2)
     when (m2 == m1) (let Just messages = Map.lookup ("192.168.1.1", 4711) m1 in sendq s (B.pack $ unwords messages) "192.168.1.1" 4711)
     return (q5, m5)

--main :: IO()
main = withSocketsDo $ do
     s <- socket AF_INET Datagram defaultProtocol
     (q1, m1) <- newRq
     done <- newEmptyMVar
     forkIO $ loopMyQ q1 m1 done
     (q', m') <- foldM (\(q, m) _ -> sendrecv s q m "ping") (q1, m1) [1..1000]
     takeMVar done
     --print ("longer than 200ms ago")
Was it helpful?

Solution

You most likely want to use MVars or TVars to maintain consistent state across threads. IORefs aren't thread-safe.

I recommend using STM (and TVars) for this problem. You're dealing with concurrent access to multiple data structures, and the composability of STM is much easier to deal with than having to think about lock order with MVars.

After looking at your code, it seems like TVars would be your best bet. Wrap your PSQ and Map in two different TVars. Wrap all code that needs a consistent view of both in an atomically transaction. In most cases, your code will 'just work'. However, if there is contention for the lock, the atomic block will just be retried until it works.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top