Question

I scripted up a simple little example that inserts 10million records into a mongodb. I started out by making it work sequentially. Then I looked up how to do concurrency, and found goroutines. This seems like what I want, but it's not behaving as I would expect. I implemented a WaitGroup to block the program from exiting before all the goroutines were finished, but I'm still having a problem.

So I'll start with what's happening then show the code. When I run the code without the goroutine all 10million records insert in mongodb fine. However, when I add the goroutine some indeterminate amount get entered.. generally around 8500 give or take a couple hundred. I checked the mongodb log to see if it was having problems and nothing is showing up. So I'm not sure it's that, could be, just not being logged. Anyway, here's the code:

(Side note: I'm doing 1 record at a time but I've split it out to a method so I can test out multiple records at a time in the future.. just haven't figured out how to do it with mongodb yet.)

package main

import (
  "fmt"
  "labix.org/v2/mgo"
  "strconv"
  "time"
  "sync"
)

// structs
type Reading struct {
  Id   string
  Name string
}

var waitGroup sync.WaitGroup

// methods
func main() {
  // Setup timer
  startTime := time.Now()

  // Setup collection
  collection := getCollection("test", "readings")
  fmt.Println("collection complete: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))

  // Setup readings
  readings := prepareReadings()
  fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))

  // Insert readings
  for i := 1; i <= 1000000; i++ {
    waitGroup.Add(1)
    go insertReadings(collection, readings)

    // fmt.Print(".")

    if i % 1000 == 0 {
      fmt.Println("1000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
    }
  }
  waitGroup.Wait()

  fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}

func getCollection(databaseName string, tableName string) *mgo.Collection {
  session, err := mgo.Dial("localhost")

  if err != nil {
    // panic(err)
    fmt.Println("error getCollection:", err)
  }

  // defer session.Close()

  // Optional. Switch the session to a monotonic behavior.
  // session.SetMode(mgo.Monotonic, true)

  collection := session.DB(databaseName).C(tableName)

  return collection
}

func insertReadings(collection *mgo.Collection, readings []Reading) {
  err := collection.Insert(readings)

  if err != nil {
    // panic(err)
    fmt.Println("error insertReadings:", err)
  }

  waitGroup.Done()
}

func prepareReadings() []Reading {
  var readings []Reading
  for i := 1; i <= 1; i++ {
    readings = append(readings, Reading{Name: "Thing"})
  }

  return readings
}
Was it helpful?

Solution 2

I have since found that it's a matter of connections. The driver for mongodb wasn't bubbling up the fact that it is unable to get a connection. It was unable to get a connection because of the load being generated on the server exceeded the maxfiles limit.

OTHER TIPS

Program execution

A complete program is created by linking a single, unimported package called the main package with all the packages it imports, transitively. The main package must have package name main and declare a function main that takes no arguments and returns no value.

func main() { … }

Program execution begins by initializing the main package and then invoking the function main. When the function main returns, the program exits. It does not wait for other (non-main) goroutines to complete.

You didn't provide us with a simple, concise, compilable and executable example of your problem. Here's a stripped-down version of your code that works.

package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"
)

// structs
type Reading struct {
    Id   string
    Name string
}

var waitGroup sync.WaitGroup

func main() {
    // Setup timer
    startTime := time.Now()

    // Setup readings
    readings := prepareReadings()
    fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))

    // Insert readings
    for i := 1; i <= 1000000; i++ {
        waitGroup.Add(1)
        go insertReadings(readings)

        // fmt.Print(".")

        if i%100000 == 0 {
            fmt.Println("100000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
        }
    }
    waitGroup.Wait()

    fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}

func insertReadings(readings []Reading) {
    waitGroup.Done()
}

func prepareReadings() []Reading {
    var readings []Reading
    for i := 1; i <= 1; i++ {
        readings = append(readings, Reading{Name: "Thing"})
    }
    return readings
}

Output:

readings prepared: 0.00
100000 readings queued for insert: 0.49
100000 readings queued for insert: 1.12
100000 readings queued for insert: 1.62
100000 readings queued for insert: 2.54
100000 readings queued for insert: 3.05
100000 readings queued for insert: 3.56
100000 readings queued for insert: 4.06
100000 readings queued for insert: 5.57
100000 readings queued for insert: 7.15
100000 readings queued for insert: 8.78
all readings inserted: 34.76

Now, build the program back up, piece-by-piece, and see where it starts to fail.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top