Question

Note - newbie in Go.

I've written a multiplexer that should merge the outputs of an array of channels into one. Happy with constructive criticism.

func Mux(channels []chan big.Int) chan big.Int {
    // Count down as each channel closes. When hits zero - close ch.
    n := len(channels)
    // The channel to output to.
    ch := make(chan big.Int, n)

    // Make one go per channel.
    for _, c := range channels {
        go func() {
            // Pump it.
            for x := range c {
                ch <- x
            }
            // It closed.
            n -= 1
            // Close output if all closed now.
            if n == 0 {
                close(ch)
            }
        }()
    }
    return ch
}

I am testing it with:

func fromTo(f, t int) chan big.Int {
    ch := make(chan big.Int)

    go func() {
        for i := f; i < t; i++ {
            fmt.Println("Feed:", i)
            ch <- *big.NewInt(int64(i))
        }
        close(ch)
    }()
    return ch
}

func testMux() {
    r := make([]chan big.Int, 10)
    for i := 0; i < 10; i++ {
        r[i] = fromTo(i*10, i*10+10)
    }
    all := Mux(r)
    // Roll them out.
    for l := range all {
        fmt.Println(l)
    }
}

but my output is very strange:

Feed: 0
Feed: 10
Feed: 20
Feed: 30
Feed: 40
Feed: 50
Feed: 60
Feed: 70
Feed: 80
Feed: 90
Feed: 91
Feed: 92
Feed: 93
Feed: 94
Feed: 95
Feed: 96
Feed: 97
Feed: 98
Feed: 99
{false [90]}
{false [91]}
{false [92]}
{false [93]}
{false [94]}
{false [95]}
{false [96]}
{false [97]}
{false [98]}
{false [99]}

So to my questions:

  • Is there something I am doing wrong in Mux?
  • Why am I only getting the last 10 from my output channel?
  • Why does the feeding look so strange? (1st of each input channel, all of the last channel and then nothing)
  • Is there a better way of doing this?

I need all of the input channels to have equal rights to the output channel - i.e. I cannot have all of the output from one channel and then all from the next etc.


For anyone interested - this was the final code after the fix and the correct (presumably) use of sync.WaitGroup

import (
    "math/big"
    "sync"
)

/*
  Multiplex a number of channels into one.
*/
func Mux(channels []chan big.Int) chan big.Int {
    // Count down as each channel closes. When hits zero - close ch.
    var wg sync.WaitGroup
    wg.Add(len(channels))
    // The channel to output to.
    ch := make(chan big.Int, len(channels))

    // Make one go per channel.
    for _, c := range channels {
        go func(c <-chan big.Int) {
            // Pump it.
            for x := range c {
                ch <- x
            }
            // It closed.
            wg.Done()
        }(c)
    }
    // Close the channel when the pumping is finished.
    go func() {
        // Wait for everyone to be done.
        wg.Wait()
        // Close.
        close(ch)
    }()
    return ch
}
Was it helpful?

Solution

Each of your goroutines spawned from Mux ends up pulling from the same channel, since c gets updated on each iteration of the loop – they don't just capture the value of c. You will get the expected results if you pass the channel to the goroutine like so:

for _, c := range channels {
    go func(c <-chan big.Int) {
        ...
    }(c)
}

You can test this modification here.

One other possible problem is your handling of the n variable: if you're running with GOMAXPROCS != 1, you could have two goroutines trying to update it at once. The sync.WaitGroup type would be a safer way to wait for goroutines to complete.

OTHER TIPS

A bit after the fact, I know, but I wrote a package which implements a generic Multiplex function similar to this one. It uses the "select" call in the reflection package to ensure efficient and balanced multiplexing without any need for a lock or wait group.

To build on James Hentridge answer, an idiomatic way to handle the re-assignement problem when using the range statement is to assign a local variable to the value at stake:

for _, c := range channels {
    c := c
    go func() {
    ...
    }()
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top