I am working on a custom script to fetch data from RackSpace cloudfiles container and make a list of all the files in a given container (container has around 100 million files) and I have been working on parallelizing the code and currently stuck.
// function to read data from channel and display
// currently just displaying, but there will be allot of processing done on this data
func extractObjectItemsFromList(objListChan <-chan []string) {
fmt.Println("ExtractObjectItemsFromList")
for _, c := range <-objListChan {
fmt.Println(urlPrefix, c, "\t", count)
}
}
func main()
// fetching data using flags
ao := gophercloud.AuthOptions{
Username: *userName,
APIKey: *apiKey,
}
provider, err := rackspace.AuthenticatedClient(ao)
client, err := rackspace.NewObjectStorageV1(provider,gophercloud.EndpointOpts{
Region: *region,
})
if err != nil {
logFatal(err)
}
// We have the option of filtering objects by their attributes
opts := &objects.ListOpts{
Full: true,
Prefix: *prefix,
}
var objectListChan = make(chan []string)
go extractObjectItemsFromList(objectListChan)
// Retrieve a pager (i.e. a paginated collection)
pager := objects.List(client, *containerName, opts)
// Not working
// By default EachPage contains 10000 records
// Define an anonymous function to be executed on each page's iteration
lerr := pager.EachPage(func(page pagination.Page) (bool, error) { // Get a slice of objects.Object structs
objectList, err := objects.ExtractNames(page)
if err != nil {
logFatal(err)
}
for _, o := range objectList {
_ = o
}
objectListChan <- objectList
return true, nil
})
if lerr != nil {
logFatal(lerr)
}
//---------------------------------------------------
// below code is working
//---------------------------------------------------
// working, but only works inside the loop, this keeps on fetching new pages and showing new records, 10000 per page
// By default EachPage contains 10000 records
// Define an anonymous function to be executed on each page's iteration
lerr := pager.EachPage(func(page pagination.Page) (bool, error) { // Get a slice of objects.Object structs
objectList, err := objects.ExtractNames(page)
if err != nil {
logFatal(err)
}
for _, o := range objectList {
fmt.Println(o)
}
return true, nil
})
if lerr != nil {
logFatal(lerr)
}
The first 10000 records are displayed but then it stuck and nothing happens. If I do not use channel and just run the plain loop it works perfectly fine, which kills the purpose of parallelizing.
Your async worker pops one list from the channel, iterates it and exits. You need to have two loops: one reading the channel (
range objListChan), the other - reading the (just retrieved) object list.