質問

TL;DR: Please just go to the last part and tell me how you would solve this problem.

I've begun using Go this morning coming from Python. I want to call a closed-source executable from Go several times, with a bit of concurrency, with different command line arguments. My resulting code is working just well but I'd like to get your input in order to improve it. Since I'm at an early learning stage, I'll also explain my workflow.

For the sake of simplicity, assume here that this "external closed-source program" is zenity, a Linux command line tool that can display graphical message boxes from the command line.

Calling an executable file from Go

So, in Go, I would go like this:

package main
import "os/exec"
func main() {
    cmd := exec.Command("zenity", "--info", "--text='Hello World'")
    cmd.Run()
}

This should be working just right. Note that .Run() is a functional equivalent to .Start() followed by .Wait(). This is great, but if I wanted to execute this program just once, the whole programming stuff would not be worth it. So let's just do that multiple times.

Calling an executable multiple times

Now that I had this working, I'd like to call my program multiple times, with custom command line arguments (here just i for the sake of simplicity).

package main    
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 // Number of times the external program is called
    for i:=0; i<NumEl; i++ {
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}

Ok, we did it! But I still can't see the advantage of Go over Python … This piece of code is actually executed in a serial fashion. I have a multiple-core CPU and I'd like to take advantage of it. So let's add some concurrency with goroutines.

Goroutines, or a way to make my program parallel

a) First attempt: just add "go"s everywhere

Let's rewrite our code to make things easier to call and reuse and add the famous go keyword:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    for i:=0; i<NumEl; i++ {
        go callProg(i)  // <--- There!
    }
}

func callProg(i int) {
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

Nothing! What is the problem? All the goroutines are executed at once. I don't really know why zenity is not executed but AFAIK, the Go program exited before the zenity external program could even be initialized. This was confirmed by the use of time.Sleep: waiting for a couple of seconds was enough to let the 8 instance of zenity launch themselves. I don't know if this can be considered a bug though.

To make it worse, the real program I'd actually like to call takes a while to execute itself. If I execute 8 instances of this program in parallel on my 4-core CPU, it's gonna waste some time doing a lot of context switching … I don't know how plain Go goroutines behave, but exec.Command will launch zenity 8 times in 8 different threads. To make it even worse, I want to execute this program more than 100,000 times. Doing all of that at once in goroutines won't be efficient at all. Still, I'd like to leverage my 4-core CPU!

b) Second attempt: use pools of goroutines

The online resources tend to recommend the use of sync.WaitGroup for this kind of work. The problem with that approach is that you are basically working with batches of goroutines: if I create of WaitGroup of 4 members, the Go program will wait for all the 4 external programs to finish before calling a new batch of 4 programs. This is not efficient: CPU is wasted, once again.

Some other resources recommended the use of a buffered channel to do the work:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    for i:=0; i<NumEl; i++ {
        go callProg(i, c)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
}

func callProg(i int, c chan bool) {
    defer func () {<- c}()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

This seems ugly. Channels were not intended for this purpose: I'm exploiting a side-effect. I love the concept of defer but I hate having to declare a function (even a lambda) to pop a value out of the dummy channel that I created. Oh, and of course, using a dummy channel is, by itself, ugly.

c) Third attempt: die when all the children are dead

Now we are nearly finished. I have just to take into account yet another side effect: the Go program closes before all the zenity pop-ups are closed. This is because when the loop is finised (at the 8th iteration), nothing prevents the program from finishing. This time, sync.WaitGroup will be useful.

package main
import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    wg := new(sync.WaitGroup)
    wg.Add(NumEl)            // Set the number of goroutines to (0 + NumEl)
    for i:=0; i<NumEl; i++ {
        go callProg(i, c, wg)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
    wg.Wait() // Wait for all the children to die
    close(c)
}

func callProg(i int, c chan bool, wg *sync.WaitGroup) {
    defer func () {
        <- c
        wg.Done() // Decrease the number of alive goroutines
    }()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

Done.

My questions

  • Do you know any other proper way to limit the number of goroutines executed at once?

I don't mean threads; how Go manages goroutines internally is not relevant. I really mean limiting the number of goroutines launched at once: exec.Command creates a new thread each time it is called, so I should control the number of time it is called.

  • Does that code look fine to you?
  • Do you know how to avoid the use of a dummy channel in that case?

I can't convince myself that such dummy channels are the way to go.

役に立ちましたか?

解決

I would spawn 4 worker goroutines that read the tasks from a common channel. Goroutines that are faster than others (because they are scheduled differently or happen to get simple tasks) will receive more task from this channel than others. In addition to that, I would use a sync.WaitGroup to wait for all workers to finish. The remaining part is just the creation of the tasks. You can see an example implementation of that approach here:

package main

import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    tasks := make(chan *exec.Cmd, 64)

    // spawn four worker goroutines
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            for cmd := range tasks {
                cmd.Run()
            }
            wg.Done()
        }()
    }

    // generate some tasks
    for i := 0; i < 10; i++ {
        tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
    }
    close(tasks)

    // wait for the workers to finish
    wg.Wait()
}

There are probably other possible approaches, but I think this is a very clean solution that is easy to understand.

他のヒント

A simple approach to throttling (execute f() N times but maximum maxConcurrency concurrently), just a scheme:

package main

import (
        "sync"
)

const maxConcurrency = 4 // for example

var throttle = make(chan int, maxConcurrency)

func main() {
        const N = 100 // for example
        var wg sync.WaitGroup
        for i := 0; i < N; i++ {
                throttle <- 1 // whatever number
                wg.Add(1)
                go f(i, &wg, throttle)
        }
        wg.Wait()
}

func f(i int, wg *sync.WaitGroup, throttle chan int) {
        defer wg.Done()
        // whatever processing
        println(i)
        <-throttle
}

Playground

I wouldn't probably call the throttle channel "dummy". IMHO it's an elegant way (it's not my invention of course), how to limit concurrency.

BTW: Please note that you're ignoring the returned error from cmd.Run().

try this: https://github.com/korovkin/limiter

 limiter := NewConcurrencyLimiter(10)
 limiter.Execute(func() {
        zenity(...) 
 })
 limiter.Wait()

🧩 Modules


📃 Template

package main

import (
    "fmt"
    "github.com/zenthangplus/goccm"
    "math/rand"
    "runtime"
)

func main() {
    semaphore := goccm.New(runtime.NumCPU())
    
    for {
        semaphore.Wait()

        go func() {
            fmt.Println(rand.Int())
            semaphore.Done()
        }()
    }
    
    semaphore.WaitAllDone()
}

🎰 Optimal routine quantity

  • If the operation is CPU bounded: runtime.NumCPU()
  • Otherwise test with: time go run *.go

🔨 Configure

export GOPATH="$(pwd)/gopath"
go mod init *.go
go mod tidy

🧹 CleanUp

find "${GOPATH}" -exec chmod +w {} \;
rm --recursive --force "${GOPATH}"

You could use Worker Pool pattern described here in this post. This is how an implementation would look like ...

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    pool := 4
    intChan := make(chan int)


    for i:=0; i<pool; i++ {
        go callProg(intChan)  // <--- launch the worker routines
    }

    for i:=0;i<NumEl;i++{
        intChan <- i        // <--- push data which will be received by workers
    }

    close(intChan) // <--- will safely close the channel & terminate worker routines
}

func callProg(intChan chan int) {
    for i := range intChan{
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top