Postgres table batch updates using golang pgxpool not reflected in database

8.3k views Asked by At

I need to update a rather large psql table with newly transformed ID's (passed in as 'records'). I created this function below to utilize pgxpool's connection pool and request batching, if I apply these transactions using a different client they update the db, also if I look at the results printed out they indicate that each time 1 row had been affected, yet when I retrieve those rows from the database they remain unchanged. Am I using the batching incorrectly somehow?

import (
    ...
    "github.com/jackc/pgconn"
    "github.com/jackc/pgx/v4"
    "github.com/jackc/pgx/v4/pgxpool"

    log "github.com/sirupsen/logrus"
)

func SetNewSubscriptionValuesBatch(dsn string, records map[string]string) error {
    
    var db *pgxpool.Pool

    db, err := pgxpool.Connect(context.Background(), dsn)
    defer db.Close()
    if err != nil {
        panic(err)
    }
    
    tx, err := db.Begin(context.Background())
    if err != nil {
        panic(err)
    }

    b := &pgx.Batch{}

    for id, subID := range records {

    sqlStatement := `
UPDATE event
SET subscription_id_2 = $2
WHERE id = $1;`
        b.Queue(sqlStatement, id, subID)
    }
    
    batchResults := tx.SendBatch(context.Background(), b)
    
    var berr error
    var result pgconn.CommandTag
    for berr == nil {
        result, berr = batchResults.Exec()
        log.WithField("result", result.String()).WithField("result.rows-affected", result.RowsAffected()).Info("batch-result")
    }
    return nil
}

Printout:

{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"","result.rows-affected":0,"time":"2020-10-14T16:47:25+01:00"}
1

There are 1 answers

0
Willeman On

Following the suggestion of @Adrian Klaver I made these changes to complete the transactions:

  1. The transactions still needs to be committed explitely

  2. The transaction connection(s) will be remain open following SendBatch. It may be required to release these before using the connetion for the commit.

Here is the code following the changes:

import (
    ...
    "github.com/jackc/pgconn"
    "github.com/jackc/pgx/v4"
    "github.com/jackc/pgx/v4/pgxpool"

    log "github.com/sirupsen/logrus"
)

func SetNewSubscriptionValuesBatch(dsn string, records map[string]string) error {

    ctx := context.Background()
    var db *pgxpool.Pool

    db, err := pgxpool.Connect(ctx, dsn)
    defer db.Close()
    if err != nil {
        panic(err)
    }
    
    tx, err := db.Begin(ctx)
    if err != nil {
        panic(err)
    }

    b := &pgx.Batch{}

    for id, subID := range records {

    sqlStatement := `
UPDATE event
SET subscription_id_2 = $2
WHERE id = $1;`
        b.Queue(sqlStatement, id, subID)
    }
    
    batchResults := tx.SendBatch(ctx, b)
    
    var qerr error
    var rows pgx.Rows
    for qerr == nil {
        rows, qerr = batchResults.Query()
        rows.Close()
    }
    
    return tx.Commit(ctx)
}