DB calls in goroutine failing without error

714 views Asked by At

I wrote a script to migrate lots of data from one DB to another and got it working fine, but now I want to try and use goroutines to speed up the script by using concurrent DB calls. Since making the change to calling go processBatch(offset) instead of just processBatch(offset), I can see that a few goroutines are started but the script finishes almost instantly and nothing is actually done. Also the number of started goroutines varies every time I call the script. There are no errors (that I can see).

I'm still new to goroutines and Go in general, so any pointers as to what I might be doing wrong are much appreciated. I have removed all logic from the code below that is not related to concurrency or DB access, as it runs fine without the changes. I also left a comment where I believe it fails, as nothing below that line is run (Print gives not output). I also tried using sync.WaitGroup to stagger DB calls, but it didn't seem to change anything.

var (
    legacyDB     *sql.DB
    v2DB         *sql.DB
)

func main() {

    var total, loops int
    var err error

    legacyDB, err = sql.Open("mysql", "...")
    if err != nil {
        panic(err)
    }
    defer legacyDB.Close()

    v2DB, err = sql.Open("mysql", "...")
    if err != nil {
        panic(err)
    }
    defer v2DB.Close()

    err = legacyDB.QueryRow("SELECT count(*) FROM users").Scan(&total)
    checkErr(err)

    loops = int(math.Ceil(float64(total) / float64(batchsize)))

    fmt.Println("Total: " + strconv.Itoa(total))
    fmt.Println("Loops: " + strconv.Itoa(loops))

    for i := 0; i < loops; i++ {
        offset := i * batchsize

        go processBatch(offset)
    }

    legacyDB.Close()
    v2DB.Close()
}

func processBatch(offset int) {

    query := namedParameterQuery.NewNamedParameterQuery(`
        SELECT ...
        LIMIT :offset,:batchsize
    `)
    query.SetValue(...)

    rows, err := legacyDB.Query(query.GetParsedQuery(), (query.GetParsedParameters())...)
    // nothing after this line gets done (Println here does not show output)
    checkErr(err)
    defer rows.Close()

    ....

    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    log.Printf("\nAlloc = %v\nTotalAlloc = %v\nSys = %v\nNumGC = %v\n\n", m.Alloc/1024/1024, m.TotalAlloc/1024/1024, m.Sys/1024/1024, m.NumGC)
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}
1

There are 1 answers

1
1lann On BEST ANSWER

As Nadh mentioned in a comment, that would be because the program exits when the main function finishes, regardless whether or not there are still other goroutines running. To fix this, a *sync.WaitGroup will suffice. A WaitGroup is used for cases where you have multiple concurrent operations, and you would like to wait until they have all completed. Documentation can be found here: https://golang.org/pkg/sync/#WaitGroup.

An example implementation for your program without the use of global variables would look like replacing

fmt.Println("Total: " + strconv.Itoa(total))
fmt.Println("Loops: " + strconv.Itoa(loops))

for i := 0; i < loops; i++ {
    offset := i * batchsize

    go processBatch(offset)
}

with

fmt.Println("Total: " + strconv.Itoa(total))
fmt.Println("Loops: " + strconv.Itoa(loops))

wg := new(sync.WaitGroup)
wg.Add(loops)

for i := 0; i < loops; i++ {
    offset := i * batchsize

    go func(offset int) {
        defer wg.Done()
        processBatch(offset)
    }(offset)
}

wg.Wait()