Frage

I would like to load some json files (".json") using a goroutine called from a for-loop. I'd like to have the loading parallellized (processing first files while the other files are being loaded).

Q1. Since the numer of files may vary (new ones to be added), I would use a (file) list with filenames (autogenerating the names only in this example), therefore I'd like to use a for-loop. Optimal?

Q2. What would be the most effective use of channel(s).

Q3. How would I define the channel(s) if a unique channel for each load operation (as in the example code below) is needed?

Example code (to be compacted & capable of loading the files using a list of file names):


func load_json(aChan chan byte, s string) {
    // load "filename" + s + ".json"
    // confirm to the channel
    aChan <- 0
}

func do_stuff() {
    // .. with the newly loaded json
}

func Main() {
    chan_A := make(chan byte)
    go load_json(chan_A, "_classA")

    chan_B := make(chan byte)
    go load_json(chan_B, "_classB")

    chan_C := make(chan byte)
    go load_json(chan_C, "_classC")

    chan_D := make(chan byte)
    go load_json(chan_D, "_classD")


    <-chan_A
        // Now, do stuff with Class A
    <-chan_B
        // etc...
    <-chan_C
    <-chan_D
    fmt.Println("Done.")
}

EDIT: I designed a simplified test solution based on the ideas suggested by "Tom" (see below). In my case I splitted the task in three phases, using one channel per phase to control the execution. However, I tend to get deadlocks with this code (See execution results and the note below below the code).

Run this code on the PlayGround.

How can I avoid the deadlocks in this code?:

type TJsonFileInfo struct {
    FileName string
}
type TChannelTracer struct {  // Will count & display visited phases A, B, C
    A, B, C int
}
var ChannelTracer TChannelTracer

var jsonFileList = []string{
    "./files/classA.json",
    "./files/classB.json",
    "./files/classC.json",
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
    var newFileInfo TJsonFileInfo
    newFileInfo.FileName = aFileName
    // file, e := ioutil.ReadFile(newFileInfo.FileName)...
    ChannelTracer.A += 1
    fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
    aResultQueueChan <- &newFileInfo
}

func UnmarshalFile(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
    FileInfo := <-aWorkQueueChan
    ChannelTracer.B += 1
    fmt.Printf("B. Marshalled file: %s\n", FileInfo.FileName)
    aResultQueueChan <- FileInfo
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
    FileInfo := <-aWorkQueueChan
    ChannelTracer.C += 1
    fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
    aDoneQueueChan <- FileInfo
}

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
        go UnmarshalFile(marshalChan, processChan)
        go ProcessWork(processChan, doneProcessingChan)
    }

    for {
        select {
        case result := <-marshalChan:
            result.FileName = result.FileName // dummy use
        case result := <-processChan:
            result.FileName = result.FileName // dummy use
        case result := <-doneProcessingChan:
            result.FileName = result.FileName // dummy use
            fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
        }
    }
}

/**
RESULTS (for phases A, B and C):

A. Loaded file: ./files/classA.json
A. Loaded file: ./files/classB.json
A. Loaded file: ./files/classC.json
B. Marshalled file: ./files/classB.json
B. Marshalled file: ./files/classC.json
C. Processed file: ./files/classB.json 
C. Processed file: ./files/classC.json 
Done. Channels visited: {3 2 2}     // ChannelTracer for phase A, B and C
Done. Channels visited: {3 2 2}
fatal error: all goroutines are asleep - deadlock!
*/

Note that this code doesn't access the file system so it should run on the PlayGround.

EDIT2: - Apart from the unsafe "ChannelTracer" I can avoid deadlocks only by consuming doneProcessingChannel the same number of times as the file tasks.
Run the code here: Playground

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    go UnmarshalFiles(marshalChan, processChan)
    go ProcessWork(processChan, doneProcessingChan)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
    }

    //  Read doneProcessingChan equal number of times
    //  as the spawned tasks (files) above :
    for i := 0; i < len(jsonFileList); i++ {
        <-doneProcessingChan
        fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
    }
}

// RIL

War es hilfreich?

Lösung

building on the answer by @BraveNewCurrency I have composed a simplistic example program for you:

package main

import (
    "encoding/json"
    "fmt"
    "os"
)

type Result struct {
    Some    string
    Another string
    AndAn   int
}

func generateWork(work chan *os.File) {
    files := []string{
        "/home/foo/a.json",
        "/home/foo/b.json",
        "/home/foo/c.json",
    }
    for _, path := range files {
        file, e := os.Open(path)
        if e != nil {
            panic(e)
        }
        work <- file
    }
}

func processWork(work chan *os.File, done chan Result) {
    file := <-work
    decoder := json.NewDecoder(file)
    result := Result{}
    decoder.Decode(&result)
    done <- result
}

func main() {
    work := make(chan *os.File)
    go generateWork(work)
    done := make(chan Result)
    for i := 0; i < 100; i++ {
        go processWork(work, done)
    }
    for {
        select {
        case result := <-done:
            // a result is available
            fmt.Println(result)
        }
    }
}

Note that this program won't work on the playground because file-system access is disallowed there.

Edit:

To answer the edition in your question, I've taken the code and changed some small things:

package main

import (
    _ "encoding/json"
    "fmt"
    _ "io/ioutil"
    _ "os"
)

type TJsonMetaInfo struct {
    MetaSystem string
}

type TJsonFileInfo struct {
    FileName string
}

type TChannelTracer struct { // Will count & display visited phases A, B, C
    A, B, C int
}

var ChannelTracer TChannelTracer

var jsonFileList = []string{
    "./files/classA.json",
    "./files/classB.json",
    "./files/classC.json",
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
    newFileInfo := TJsonFileInfo{aFileName}
    // file, e := ioutil.ReadFile(newFileInfo.FileName)
    // etc...
    ChannelTracer.A += 1
    fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
    aResultQueueChan <- &newFileInfo
}

func UnmarshalFiles(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
    for {
        FileInfo := <-aWorkQueueChan
        ChannelTracer.B += 1
        fmt.Printf("B. Unmarshalled file: %s\n", FileInfo.FileName)
        aResultQueueChan <- FileInfo
    }
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
    for {
        FileInfo := <-aWorkQueueChan
        ChannelTracer.C += 1
        fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
        aDoneQueueChan <- FileInfo

    }
}

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    go UnmarshalFiles(marshalChan, processChan)
    go ProcessWork(processChan, doneProcessingChan)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
    }

    for {
        select {
        case result := <-doneProcessingChan:
            result.FileName = result.FileName // dummy use
            fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
        }
    }
}

Note that this code still deadlocks but at the end, when all work is complete, in the last empty for loop in main().

Note also that these lines:

ChannelTracer.A += 1
ChannelTracer.B += 1
ChannelTracer.C += 1

are not concurrency-safe. This means that in a multi-threaded environment one goroutine and the other might try to increment the same counter at the same time, resulting in a wrong count. To come around this issue, take a look at the following packages:

Andere Tipps

You should structure your program this way:

1) the main routine creates a channel for "work to do" and probably one for "done work" (both channels should probably have some buffering)

2) spin off one goroutine to generate the file list and put them in the "work to do" channel.

3) spin up N goroutines (in a for loop) to process files. The routine will read the file from the "work to do" channel, process it, and send the response to the "done work" channel.

4) the main routine waits on "done work" and prints them or whatever.

The optimal "N" above varies depending on the problem - If your work is CPU bound, the optimal N should be about the number of processors in your system. - If your work is disk bound, performance may actually go down as you increase N because multiple workers will cause more random I/O. - If your work pulls files from many remote computers (think webcrawling), then the optimal N might be very high (100 or even 1000).

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top