Question

I have a slice of channels that all receive the same message:

func broadcast(c <-chan string, chans []chan<- string) {
    for msg := range c {
        for _, ch := range chans {
            ch <- msg
        }
    }
}

However, since each of the channels in chans are potentially being read at a different rate, I don't want to block the other channels when I get a slow consumer. I've solved this with goroutines:

func broadcast(c <-chan string, chans []chan<- string) {
    for msg := range c {
        for _, ch := range chans {
            go func() { ch <- msg }()
        }
    }
}

However, the order of the messages that get passed to each channel is important. I looked to the spec to see if channels preserve order when blocked, and all I found was this:

If the capacity is greater than zero, the channel is asynchronous: communication operations succeed without blocking if the buffer is not full (sends) or not empty (receives), and elements are received in the order they are sent.

To me, if a write is blocked, then it is not "sent", but waiting to be sent. With that assumption, the above says nothing about order of sending when multiple goroutines are blocked on writing.

Are there any guarantees about the order of sends after a channel becomes unblocked?

Was it helpful?

Solution

No, there are no guarantees.

Even when the channel is not full, if two goroutines are started at about the same time to send to it, I don't think there is any guarantee that the goroutine that was started first would actually execute first. So you can't count on the messages arriving in order.

OTHER TIPS

You can drop the message if the channel is full (and then set a flag to pause the client and send them a message that they're dropping messages or whatever).

Something along the lines of (untested):

type Client struct {
    Name string
    ch   chan<-string
}

func broadcast(c <-chan string, chans []*Client) {
    for msg := range c {
        for _, ch := range chans {
            select {
            case ch.ch <- msg:
            // all okay
            default:
                log.Printf("Channel was full sending '%s' to client %s", msg, ch.Name)
            }
        }
    }
}

In this code, no guarantees.

The main problem with the given sample code lies not in the channel behavior, but rather in the numerous created goroutines. All the goroutines are "fired" inside the same imbricated loop without further synchronization, so even before they start to send messages, we simply don't know which ones will execute first.

However this rises a legitimate question in general : if we somehow garantee the order of several blocking send instructions, are we guaranteed to receive them in the same order?

The "happens-before" property of the sendings is difficult to create. I fear it is impossible because :

  1. Anything can happen before the sending instruction : for example, other goroutines performing their own sendings or not
  2. A goroutine being blocked in a sending cannot simultaneously manage other sorts of synchronization

For example, if I have 10 goroutines numbered 1 to 10, I have no way of letting them send their own number to the channel, concurrently, in the right order. All I can do is use various kinds of sequential tricks like doing the sorting in 1 single goroutine.

This is an addition to the already posted answers.

As practically everyone stated, that the problem is the order of execution of the goroutines, you can easily coordinate goroutine execution using channels by passing around the number of the goroutine you want to run:

func coordinated(coord chan int, num, max int, work func()) {
    for {
        n := <-coord

        if n == num {
            work()
            coord <- (n+1) % max
        } else {
            coord <- n
        }
    }
}

coord := make(chan int)

go coordinated(coord, 0, 3, func() { println("0"); time.Sleep(1 * time.Second) })
go coordinated(coord, 1, 3, func() { println("1"); time.Sleep(1 * time.Second) })
go coordinated(coord, 2, 3, func() { println("2"); time.Sleep(1 * time.Second) })

coord <- 0

or by using a central goroutine which executes the workers in a ordered manner:

func executor(funs chan func()) {
    for {
        worker := <-funs
        worker()
        funs <- worker
    }
}

funs := make(chan func(), 3)

funs <- func() { println("0"); time.Sleep(1 * time.Second) }
funs <- func() { println("1"); time.Sleep(1 * time.Second) }
funs <- func() { println("2"); time.Sleep(1 * time.Second) }

go executor(funs)

These methods will, of course, remove all parallelism due to synchronization. However, the concurrent aspect of your program remains.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top