Question

I am writing an application which writes to the mongodb rapidly. Too rapidly for mongodb and mgo to handle. My question is, is there a way for me to determine that mongo cannot keep up and start to block? But I also do not want to block unnecessarily. Here is a sample of code that emulates the problem:

package main

import (
  "labix.org/v2/mgo"
  "time"
  "fmt"
)

// in database name is a string and age is an int

type Dog struct{
  Breed string "breed"
}

type Person struct{
  Name string "name"
  Pet Dog `bson:",inline"`
  Ts        time.Time
}

func insert(session *mgo.Session, bob Person){
  err := session.DB("db_log").C("people").Insert(&bob)
  if err != nil {
    panic("Could not insert into database")
  }
}

func main() {
  session, _ := mgo.Dial("localhost:27017")
  bob := Person{Name : "Robert", Pet : Dog{}}
  i := 0
  for {
    time.Sleep(time.Duration(1) * time.Microsecond)
    i++
    go insert(session, bob)
  }
}

I often get errors like:

panic: Could not insert into database

or

panic: write tcp 127.0.0.1:27017: i/o timeout
Was it helpful?

Solution

I suspect you will get much better performance if you allow Go to use multiple threads and Copy() then Close() your sessions.

To answer your question, this probably a perfect use-case for a channel. Feed the items into the channel in one goroutine and consume them/write them to Mongo in another. You can adjust the size of the channel to suit your needs. The producer thread will block once the channel is full when it tries to send to it.

You may also want to play with the Safe() method settings. Setting W:0 will put Mongo in a "fire and forget" mode, which will dramatically speed up performance at the risk of losing some data. You can also change the timeout time.

OTHER TIPS

I haven't tested yet but I think this code should work. I get this issue after keep a session for long time so that I have timer to renew session every certain time.

package main

import (
  "gopkg.in/mgo.v2"
  "time"
  "fmt"
)

// in database name is a string and age is an int

type Dog struct{
  Breed string "breed"
}

type Person struct{
  Name string "name"
  Pet Dog `bson:",inline"`
  Ts        time.Time
}

func insert(session *mgo.Session, bob Person){
  err := session.DB("db_log").C("people").Insert(&bob)
  if err != nil {
    panic("Could not insert into database")
  }
}

func main() {
  current_session, _ := mgo.Dial("localhost:27017")
  using_session := current_session
  bob := Person{Name : "Robert", Pet : Dog{}}

  /*
  * this technical to prevent connect timeout after long time connection on mongodb from golang session
  * Idea is simple: the session will be renew after certain time such as 1 hour
  */
  //ticker := time.NewTicker(time.Hour * 1)

  //Set 10 seconds for test
  ticker := time.NewTicker(time.Second * 10)

  go func() {

    for t := range ticker.C {
      fmt.Println("Tick at", t)
      new_session := current_session.Copy()
      fmt.Printf("Current session here %p\n", current_session)
      fmt.Printf("New session here %p\n", new_session)
      using_session = new_session
      //setTimeout 30 second before close old sesion, to make sure current instance use current connection isn't affect
      //time.AfterFunc(time.Second * 30, func() { 

      //Set 2 seconds for test
      time.AfterFunc(time.Second * 2, func() { 

        //close previous session

        current_session.Close()
        current_session = new_session

        //assign to new session

      })

    }
  }()

  i := 0
  for {
    time.Sleep(time.Duration(1) * time.Microsecond)
    i++
    go insert(using_session, bob)
  }

}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top