I have since found that it's a matter of connections. The driver for mongodb wasn't bubbling up the fact that it is unable to get a connection. It was unable to get a connection because of the load being generated on the server exceeded the maxfiles limit.
Just how do goroutines work, and do they die when the main process finishes?
Question
I scripted up a simple little example that inserts 10million records into a mongodb. I started out by making it work sequentially. Then I looked up how to do concurrency, and found goroutines. This seems like what I want, but it's not behaving as I would expect. I implemented a WaitGroup to block the program from exiting before all the goroutines were finished, but I'm still having a problem.
So I'll start with what's happening then show the code. When I run the code without the goroutine all 10million records insert in mongodb fine. However, when I add the goroutine some indeterminate amount get entered.. generally around 8500 give or take a couple hundred. I checked the mongodb log to see if it was having problems and nothing is showing up. So I'm not sure it's that, could be, just not being logged. Anyway, here's the code:
(Side note: I'm doing 1 record at a time but I've split it out to a method so I can test out multiple records at a time in the future.. just haven't figured out how to do it with mongodb yet.)
package main
import (
"fmt"
"labix.org/v2/mgo"
"strconv"
"time"
"sync"
)
// structs
type Reading struct {
Id string
Name string
}
var waitGroup sync.WaitGroup
// methods
func main() {
// Setup timer
startTime := time.Now()
// Setup collection
collection := getCollection("test", "readings")
fmt.Println("collection complete: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// Setup readings
readings := prepareReadings()
fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// Insert readings
for i := 1; i <= 1000000; i++ {
waitGroup.Add(1)
go insertReadings(collection, readings)
// fmt.Print(".")
if i % 1000 == 0 {
fmt.Println("1000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
}
waitGroup.Wait()
fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
func getCollection(databaseName string, tableName string) *mgo.Collection {
session, err := mgo.Dial("localhost")
if err != nil {
// panic(err)
fmt.Println("error getCollection:", err)
}
// defer session.Close()
// Optional. Switch the session to a monotonic behavior.
// session.SetMode(mgo.Monotonic, true)
collection := session.DB(databaseName).C(tableName)
return collection
}
func insertReadings(collection *mgo.Collection, readings []Reading) {
err := collection.Insert(readings)
if err != nil {
// panic(err)
fmt.Println("error insertReadings:", err)
}
waitGroup.Done()
}
func prepareReadings() []Reading {
var readings []Reading
for i := 1; i <= 1; i++ {
readings = append(readings, Reading{Name: "Thing"})
}
return readings
}
Solution 2
OTHER TIPS
A complete program is created by linking a single, unimported package called the
main
package with all the packages it imports, transitively. Themain
package must have package namemain
and declare a functionmain
that takes no arguments and returns no value.func main() { … }
Program execution begins by initializing the
main
package and then invoking the functionmain
. When the functionmain
returns, the program exits. It does not wait for other (non-main
)goroutines
to complete.
You didn't provide us with a simple, concise, compilable and executable example of your problem. Here's a stripped-down version of your code that works.
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
// structs
type Reading struct {
Id string
Name string
}
var waitGroup sync.WaitGroup
func main() {
// Setup timer
startTime := time.Now()
// Setup readings
readings := prepareReadings()
fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// Insert readings
for i := 1; i <= 1000000; i++ {
waitGroup.Add(1)
go insertReadings(readings)
// fmt.Print(".")
if i%100000 == 0 {
fmt.Println("100000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
}
waitGroup.Wait()
fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
func insertReadings(readings []Reading) {
waitGroup.Done()
}
func prepareReadings() []Reading {
var readings []Reading
for i := 1; i <= 1; i++ {
readings = append(readings, Reading{Name: "Thing"})
}
return readings
}
Output:
readings prepared: 0.00
100000 readings queued for insert: 0.49
100000 readings queued for insert: 1.12
100000 readings queued for insert: 1.62
100000 readings queued for insert: 2.54
100000 readings queued for insert: 3.05
100000 readings queued for insert: 3.56
100000 readings queued for insert: 4.06
100000 readings queued for insert: 5.57
100000 readings queued for insert: 7.15
100000 readings queued for insert: 8.78
all readings inserted: 34.76
Now, build the program back up, piece-by-piece, and see where it starts to fail.