Insert Item by using goroutine does not work properly

115 views Asked by At

I tried to insert Item by using goroutines ( 2000 goroutines)

so run below code


package main

import (
    "fmt"
    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
    "log"
    "sync"
)

type Item struct {
    Id          int    `db:"id"`
    Title       string `db:"title"`
    Description string `db:"description"`
}

// ConnectPostgresDB -> connect postgres db
func ConnectPostgresDB() *sqlx.DB {
    connstring := "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=8080"
    db, err := sqlx.Open("postgres", connstring)
    if err != nil {
        fmt.Println(err)
        return db
    }
    return db
}

func InsertItem(item Item, wg *sync.WaitGroup) {
    defer wg.Done()
    db := ConnectPostgresDB()
    defer db.Close()
    tx, err := db.Beginx()
    if err != nil {
        fmt.Println(err)
        return
    }

    _, err = tx.Queryx("INSERT INTO items(id, title, description) VALUES($1, $2, $3)", item.Id, item.Title, item.Description)
    if err != nil {
        fmt.Println(err)
    }

    err = tx.Commit()
    if err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("Data is Successfully inserted!!")
}





func main() {
    var wg sync.WaitGroup
    //db, err := sqlx.Connect("postgres", "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=8080")
    for i := 1; i <= 2000; i++ {
        item := Item{Id: i, Title: "TestBook", Description: "TestDescription"}
        //go GetItem(db, i, &wg)
        wg.Add(1)
        go InsertItem(item, &wg)

    }
    wg.Wait()
    fmt.Println("All DB Connection is Completed")
}

after run above code , I think 2000'th rows in items table

But in items table , there are only 150'rows exists

too many clients Error in db server so i increased max_connections 100 -> 4000

and then tried again

But Results is same

Does anybody know why these results occurs???

1

There are 1 answers

2
Markus W Mahlberg On BEST ANSWER

Note

All the code for the answer below is available under https://github.com/mwmahlberg/so-postgres.

The problem is that you do not really seem to note the cause of the error.

With a slightly adjusted main.go, the problem should become rather obvious:

package main

import (
    "flag"
    "fmt"
    "log"
    "os"
    "sync"
    "time"

    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
)

const schema = `
CREATE TABLE IF NOT EXISTS items (
        id integer primary key,
        title text,
        description text
);
`

type Item struct {
    Id          int    `db:"id"`
    Title       string `db:"title"`
    Description string `db:"description"`
}

var (
    // Make the waitgroup global: Easier to use and less error-prone
    wg sync.WaitGroup

    // Make the database URL a configurable flag
    dburl string
)

func init() {
    // Make the database URL a configurable flag
    flag.StringVar(&dburl, "dburl", "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=5432", "Postgres DB URL")
}

// handlePanics is a simple function to log the error that caused a panic and exit the program
func handlePanics() {
    if r := recover(); r != nil {
        log.Println("encountered panic: ", r)
        os.Exit(1)
    }
}

// InsertItem inserts an item into the database.
// Note that the db is passed as an argument.
func InsertItem(item Item, db *sqlx.DB) {
    defer wg.Done()
    // With the beginning of the transaction, a connection is acquired from the pool
    tx, err := db.Beginx()
    if err != nil {
        panic(fmt.Errorf("beginning transaction: %s", err))
    }

    _, err = tx.Exec("INSERT INTO items(id, title, description) VALUES($1, $2, $3)", item.Id, item.Title, item.Description)
    if err != nil {
        // the rollback is rather superfluous here
        // but it's good practice to include it
        tx.Rollback()

        // panic will cause the goroutine to exit and the waitgroup to decrement
        // Also, the handlePanics function will catch the panic and log the error
        panic(fmt.Errorf("inserting data %#v: %s", item, err))
    }

    err = tx.Commit()
    if err != nil {
        panic(fmt.Errorf("committing transaction: %s", err))
    }

    log.Printf("Inserted item with id %d\n", item.Id)
}

func main() {

    // Recover from panics and log the error for the main goroutine
    defer handlePanics()

    flag.Parse()
    log.Printf("DB URL: %s\n", dburl)

    var (
        db  *sqlx.DB
        err error
    )

    // Only open one connection to the database.
    // The postgres driver will open a pool of connections for you.
    if db, err = sqlx.Connect("postgres", dburl); err != nil {
        log.Fatalln(err)
    }
    defer db.Close()

    // Create the items table
    // Note that if this panics, the handlePanics function will catch it and log the error
    db.MustExec(schema)

    // maxOpen := db.Stats().MaxOpenConnections
    // var mutex sync.Mutex
    start := time.Now()
    for i := 1; i <= 2000; i++ {

        wg.Add(1)

        go func(i int) {
            // For goroutines, you must explicitly set the panic handler
            defer handlePanics()
            InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
        }(i)
    }
    wg.Wait()
    elapsed := time.Since(start)
    fmt.Printf("All DB Inserts completed after %s\n", elapsed)
}

And indeed the application logs an error in my test setup:

2024/02/25 16:41:27 encountered panic:  beginning transaction: pq: sorry, too many clients already

So, we need to add a control for that:


// Set the number of connections in the pool
db.DB.SetMaxOpenConns(10)

// use the actual value
maxOpen := db.DB.Stats().MaxOpenConnections

var mutex sync.Mutex
for i := 1; i <= 2000; i++ {

    wg.Add(1)

    // For goroutines, you must explicitly set the panic handler
    go func(i int) {

        defer handlePanics()

        // use a label to ensure that the goroutine breaks out of inner loop
    waitForOpenConnection:
        for {
            // Lock the mutex to check the number of open connections.
            // We need to do this otherwise another goroutine could increment the number of open connections
            mutex.Lock()

            // Get the connections in the pool that are currently in use
            switch open := db.DB.Stats().InUse; {

            // If the number of open connections is less than the maximum, insert the item
            case open <= maxOpen:
                log.Println("Inserting item")
                InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
                // Now that the item has been inserted, unlock the mutex and break out of the inner loop
                mutex.Unlock()
                break waitForOpenConnection
            default:
                // Allow other goroutines to read the number of open connections
                mutex.Unlock()
            }
        }
    }(i)
}

And sure enough, the result is as expected:

All DB Inserts completed after 514.022334ms

Quite some hassle for something so (deceivingly) simple, right?

Now here comes the REALLY troubling part:

"Concurrency is not parallelism."

Go proverb

If we have a look at the simplified version (full code in the aptly named "simplified" branch):

package main

...

const (
    schema = `
CREATE TABLE IF NOT EXISTS items (
        id integer primary key,
        title text,
        description text
);
`
    insert = `
INSERT INTO items(id, title, description) VALUES($1, $2, $3)
`
)

...

// InsertItem inserts an item into the database.
// Note that the db is passed as an argument.
func InsertItem(item Item, db *sqlx.DB) {

    var (
        tx  *sqlx.Tx
        err error
    )

    // With the beginning of the transaction, a connection is acquired from the pool
    if tx, err = db.Beginx(); err != nil {
        panic(fmt.Errorf("beginning transaction: %s", err))
    }

    if _, err = tx.Exec(insert, item.Id, item.Title, item.Description); err != nil {
        // the rollback is rather superfluous here
        // but it's good practice to include it
        tx.Rollback()

        // panic will cause the goroutine to exit and the waitgroup to decrement
        // Also, the handlePanics function will catch the panic and log the error
        panic(fmt.Errorf("inserting data: %s", err))
    }

    if err = tx.Commit(); err != nil {
        panic(fmt.Errorf("committing transaction: %s", err))
    }

}

func main() {

    // Recover from panics and log the error for the main goroutine
    defer handlePanics()

    flag.Parse()
    log.Printf("DB URL: %s\n", dburl)

    var (
        db  *sqlx.DB
        err error
    )

    // Only open one connection to the database.
    // The postgres driver will open a pool of connections for you.
    if db, err = sqlx.Connect("postgres", dburl); err != nil {
        log.Fatalln(err)
    }
    defer db.Close()

    // Create the items table
    // Note that if this panics, the handlePanics function will catch it and log the error
    db.MustExec(schema)
    start := time.Now()

    // Set the number of connections in the pool
    db.DB.SetMaxOpenConns(10)

    for i := 1; i <= 2000; i++ {
        // use a label to ensure that the goroutine breaks out of inner loop
        InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
    }
    log.Printf("All DB Inserts completed after %s\n", time.Since(start))
}

Not only is main much more readable and a lot less complicated - it is of comparable speed. Actually, in my totally non-scientific tests all runs were on average 100ms faster than with the goroutine hassle.

Summary

To make a very long story short: Conventional wisdom has it that premature optimization is the root of all evil. And concurrency is not parallelism.

Do not use goroutines unless you have to, check your errors and react to them.