How do you combine multiple Scalaz-Streams such that order of completion is preserved but interleaving isn't enforced?

StackOverflow https://stackoverflow.com/questions/21889760

質問

var num =0
var num2 = 3333
val p2 = Process.eval {
  Thread.sleep(10000)
  Task.delay {
    Thread.sleep(10000)
    num2 = num2 + 1
    s"hi ${num2}"
  }
}.repeat.take(15)

//p2: scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] =
// Await(scalaz.concurrent.Task@5a554f1c,
//<function1>,Halt(scalaz.stream.Process$End$),Halt(scalaz.stream.Process$End$))

val p1 = Process.eval {
  Thread.sleep(2000)
  Task.delay { 
    Thread.sleep(2000)
    num = num + 1
    s"hi $num"
  }
}.repeat.take(15)

//p1: scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = 
// Await(scalaz.concurrent.Task@7a54e904,
// <function1>,Halt(scalaz.stream.Process$End$),Halt(scalaz.stream.Process$End$))

// this interleaves them and I get disjunctions showing me their order
(p1 either p2).map(println).run.run

// this gives me the strings that are interleaved
(p1 interleave p2).map(println).run.run

How do you get a Process that is the combination of the 2 Processes but just in whatever order they arrive (meaning if the left goes twice before the right, that's ok, give the left twice and then when the right arrives emit it afterwards)?

I'm looking for the one with the shorter sleep to occur more frequently and see it come out multiple times before the slower process. Thanks in advance for anyone taking the time to read this, and especially to those who can share some insight.

役に立ちましたか?

解決

Eric,

the non-deterministical interleave is implemented in scalaz-stream via Process.wye, and in fact either is one of the non-deterministical combinators using wye. The reason you see them interleave left/right is because it tries to be fair and because you blocking the thread. Try to create one side that is slower than the second one and you will see the either is Non-deterministical.

Note in order to achieve non-determinsitical behaviour you need actually processes that are run from Two threads, your p1 process is actually blocking the single thread and as such in your scenario the order is always deterministical

try:

val p1 = Process(1,2,3).toSource
val p2 = Process(10) fby Process.sleep(1 second) fby Process(20,30).toSource

(p1 either p2).runLog.run.foreach(println)

That should emit

-\/(1)
\/-(10)
-\/(2)
-\/(3)
\/-(20)
\/-(30)
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top