Question

I am working with a particular database where, upon a successful query, you are able to access a group of chunks of the resulting data using a specific command:

getResultData :: IO (ResponseCode, ByteString)

Now getResultData will return a response code and some data where the response codes look like this:

response = GET_DATA_FAILED | OPERATION_SUCCEEDED | NO_MORE_DATA

The ByteString is one, some, or all of the chunks:

Data http://desmond.imageshack.us/Himg189/scaled.php?server=189&filename=chunksjpeg.png&res=medium

The story does not end here. There exists a stream of groups:

Stream http://desmond.imageshack.us/Himg695/scaled.php?server=695&filename=chunkgroupsjpeg.png&res=medium

Once receiving a NO_MORE_DATA response from getResultData, a call to getNextItem will iterate the stream allowing me to start calls to getResultData again. Once getNextItem returns STREAM_FINISHED, that's all she wrote; I have my data.

Now, I wish to remodel this phenomenon with either Date.Iteratee or Data.Enumerator. Inasmuch as my existing Data.Iteratee solution works, it yet seems very naive and I feel as if I should be modeling this with nested iteratees as opposed to one big iteratee blob which is how my solution is currently implemented.

I have been looking at the code of Data.Iteratee 0.8.6.2 and I am a bit confused when it comes to the nested stuff.

Are nested iteratees the proper course of action? If so, how would one model this with nested iteratees?

Regards

Was it helpful?

Solution

I think nested iteratees are the correct approach, but this case has some unique problems which make it slightly different from most common examples.

Chunks and groups

The first problem is to get the data source right. Basically the logical divisions you've described would give you a stream equivalent to [[ByteString]]. If you create an enumerator to produce this directly, each element within the stream would be a full group of chunks, which presumably you wish to avoid (for memory reasons). You could flatten everything into a single [ByteString], but then you'd need to re-introduce boundaries, which would be pretty wasteful since the db is doing it for you.

Ignoring the stream of groups for now, it appears that you need to divide the data into chunks yourself. I would model this as:

enumGroup :: Enumerator ByteString IO a
enumGroup = enumFromCallback cb ()
 where
  cb () = do
    (code, data) <- getResultData
    case code of
        OPERATION_SUCCEEDED -> return $ Right ((True, ()), data)
        NO_MORE_DATA        -> return $ Right ((False, ()), data)
        GET_DATA_FAILED     -> return $ Left MyException

Since chunks are of a fixed size, you can easily chunk this with Data.Iteratee.group.

enumGroupChunked :: Iteratee [ByteString] IO a -> IO (Iteratee ByteString IO a)
enumGroupChunked = enumGroup . joinI . group groupSize

Compare the type of this to Enumerator

type Enumerator s m a = Iteratee s m a -> m (Iteratee s m a)

So enumGroupChunked is basically a fancy enumerator which changes the stream type. This means that it takes a [ByteString] iteratee consumer, and returns an iteratee which consumes plain bytestrings. Often the return type of an enumerator doesn't matter; it's simply an iteratee which you evaluate with run (or tryRun) to get at the output, so you could do the same here:

evalGroupChunked :: Iteratee [ByteString] IO a -> IO a
evalGroupChunked i = enumGroupChunked i >>= run

If you have more complicated processing to do on each group, the easiest place to do so would be in the enumGroupChunked function.

Stream of groups

Now this is out of the way, what to do about the stream of groups? The answer depends on how you want to consume them. If you want to essentially treat each group in the stream independently, I would do something similar to this:

foldStream :: Iteratee [ByteString] IO a -> (b -> a -> b) -> b -> IO b
foldStream iter f acc0 = do
  val <- evalGroupChunked iter
  res <- getNextItem
  case res of 
        OPERATION_SUCCEEDED -> foldStream iter f $! f acc0 val
        NO_MORE_DATA        -> return $ f acc0 val
        GET_DATA_FAILED     -> error "had a problem"

However, let's say you want to do some sort of stream processing of the entire dataset, not just individual groups. That is, you have a

bigProc :: Iteratee [ByteString] IO a

that you want to run over the entire dataset. This is where the return iteratee of an enumerator is useful. Some earlier code will be slightly different now:

enumGroupChunked' :: Iteratee [ByteString] IO a
  -> IO (Iteratee ByteString IO (Iteratee [ByteString] IO a))
enumGroupChunked' = enumGroup . group groupSize

procStream :: Iteratee [ByteString] IO a -> a
procStream iter = do
  i' <- enumGroupChunked' iter >>= run
  res <- getNextItem
  case res of 
        OPERATION_SUCCEEDED -> procStream i'
        NO_MORE_DATA        -> run i'
        GET_DATA_FAILED     -> error "had a problem"

This usage of nested iteratees (i.e. Iteratee s1 m (Iteratee s2 m a)) is slightly uncommon, but it's particularly helpful when you want to sequentially process data from multiple Enumerators. The key is to recognize that runing the outer iteratee will give you an iteratee which is ready to receive more data. It's a model that works well in this case, because you can enumerate each group independently but process them as a single stream.

One caution: the inner iteratee will be in whatever state it was left in. Suppose that the last chunk of a group may be smaller than a full chunk, e.g.

   Group A               Group B               Group C
   1024, 1024, 512       1024, 1024, 1024      1024, 1024, 1024

What will happen in this case is that, because group is combining data into chunks of size 1024, it will combine the last chunk of Group A with the first 512 bytes of Group B. This isn't a problem with the foldStream example because that code terminates the inner iteratee (with joinI). That means the groups are truly independent, so you have to treat them as such. If you want to combine the groups as in procStream, you have to think of the entire stream. If this is your case, then you'll need to use something more sophisticated than just group.

Data.Iteratee vs Data.Enumerator

Without getting into a debate of the merits of either package, not to mention IterIO (I'm admittedly biased), I would like to point out what I consider the most significant difference between the two: the abstraction of the stream.

In Data.Iteratee, a consumer Iteratee ByteString m a operates on a notional ByteString of some length, with access to a single chunk of ByteString at one time.

In Data.Enumerator, a consumer Iteratee ByteString m a operates on a notional [ByteString], with access to one or more elements (bytestrings) at one time.

This means that most Data.Iteratee operations are element-focused, that is with an Iteratee ByteString they'll operate on a single Word8, whereas Data.Enumerator operations are chunk-focused, operating on a ByteString.

You can think of Data.Iteratee.Iteratee [s] m a === Data.Enumerator.Iteratee s m a.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top