building on the answer by @BraveNewCurrency I have composed a simplistic example program for you:
package main
import (
"encoding/json"
"fmt"
"os"
)
type Result struct {
Some string
Another string
AndAn int
}
func generateWork(work chan *os.File) {
files := []string{
"/home/foo/a.json",
"/home/foo/b.json",
"/home/foo/c.json",
}
for _, path := range files {
file, e := os.Open(path)
if e != nil {
panic(e)
}
work <- file
}
}
func processWork(work chan *os.File, done chan Result) {
file := <-work
decoder := json.NewDecoder(file)
result := Result{}
decoder.Decode(&result)
done <- result
}
func main() {
work := make(chan *os.File)
go generateWork(work)
done := make(chan Result)
for i := 0; i < 100; i++ {
go processWork(work, done)
}
for {
select {
case result := <-done:
// a result is available
fmt.Println(result)
}
}
}
Note that this program won't work on the playground because file-system access is disallowed there.
Edit:
To answer the edition in your question, I've taken the code and changed some small things:
package main
import (
_ "encoding/json"
"fmt"
_ "io/ioutil"
_ "os"
)
type TJsonMetaInfo struct {
MetaSystem string
}
type TJsonFileInfo struct {
FileName string
}
type TChannelTracer struct { // Will count & display visited phases A, B, C
A, B, C int
}
var ChannelTracer TChannelTracer
var jsonFileList = []string{
"./files/classA.json",
"./files/classB.json",
"./files/classC.json",
}
func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
newFileInfo := TJsonFileInfo{aFileName}
// file, e := ioutil.ReadFile(newFileInfo.FileName)
// etc...
ChannelTracer.A += 1
fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
aResultQueueChan <- &newFileInfo
}
func UnmarshalFiles(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
for {
FileInfo := <-aWorkQueueChan
ChannelTracer.B += 1
fmt.Printf("B. Unmarshalled file: %s\n", FileInfo.FileName)
aResultQueueChan <- FileInfo
}
}
func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
for {
FileInfo := <-aWorkQueueChan
ChannelTracer.C += 1
fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
aDoneQueueChan <- FileInfo
}
}
func main() {
marshalChan := make(chan *TJsonFileInfo)
processChan := make(chan *TJsonFileInfo)
doneProcessingChan := make(chan *TJsonFileInfo)
go UnmarshalFiles(marshalChan, processChan)
go ProcessWork(processChan, doneProcessingChan)
for _, fileName := range jsonFileList {
go LoadJsonFiles(fileName, marshalChan)
}
for {
select {
case result := <-doneProcessingChan:
result.FileName = result.FileName // dummy use
fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
}
}
}
Note that this code still deadlocks but at the end, when all work is complete, in the last empty for
loop in main()
.
Note also that these lines:
ChannelTracer.A += 1
ChannelTracer.B += 1
ChannelTracer.C += 1
are not concurrency-safe. This means that in a multi-threaded environment one goroutine and the other might try to increment the same counter at the same time, resulting in a wrong count. To come around this issue, take a look at the following packages: