I'm trying to use Go to insert a row of data into a Postgres table for each new message received from rabbitmq, using a single connection to the DB which is opened in the init function of the code below.
Rather than opening just one connection, the code opens 497 and maxes out which causes the row inserts to stop...
I have tried using the info in these questions opening and closing DB connection in Go app and open database connection inside a function which say I should open one connection and use global db to allow the main function to pass the sql statement to the connection opened in the init function.
I thought I had done this, however a new connection is being opened for each new row so the code stops working once the postgres connection limit is reached...
I am new to Go and have limited programming experience, I have been trying to understand/resolve this issue for the past two days and I could really do with some help understanding where I am going wrong with this...
var db *sql.DB
func init() {
var err error
db, err = sql.Open ( "postgres", "postgres://postgres:postgres@SERVER/PORT/DB")
if err != nil {
log.Fatal("Invalid DB config:", err)
}
if err = db.Ping(); err != nil {
log.Fatal("DB unreachable:", err)
}
}
func main() {
// RABBITMQ CONNECTION CODE IS HERE
// EACH MESSAGE RECEIVED IS SPLIT TO LEGEND, STATUS, TIMESTAMP VARIABLES
// VARIABLES ARE PASSED TO sqlSatement
sqlStatement := `
INSERT INTO heartbeat ("Legend", "Status", "TimeStamp")
VALUES ($1, $2, $3)
`
// sqlStatement IS THEN PASSED TO db.QueryRow
db.QueryRow(sqlStatement, Legend, Status, TimeStamp)
}
}()
<-forever
}
Full code is shown below:
package main
import (
"database/sql"
"log"
_ "github.com/lib/pq"
"github.com/streadway/amqp"
"strings"
)
var db *sql.DB
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func init() {
var err error
db, err = sql.Open ( "postgres", "postgres://postgres:postgres@192.168.1.69:5432/test?sslmode=disable")
if err != nil {
log.Fatal("Invalid DB config:", err)
}
if err = db.Ping(); err != nil {
log.Fatal("DB unreachable:", err)
}
}
func main() {
conn, err := amqp.Dial("amqp://Admin:Admin@192.168.1.69:50003/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"HEARTBEAT", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
myString := string(d.Body[:])
result := strings.Split(myString, ",")
Legend := result[0]
Status := result[1]
TimeStamp := result[2]
sqlStatement := `
INSERT INTO heartbeat ("Legend", "Status", "TimeStamp")
VALUES ($1, $2, $3)
`
//
db.QueryRow(sqlStatement, Legend, Status, TimeStamp)
}
}()
<-forever
}