In the below snippet I tried to perform GET request using POOL of worker.
I thought it will reuse the TCP session instead of creating new one for every worker, but it creates multiple session.
Could someone please help in validating the code:
package main
import (
"fmt"
"net/http"
"sync"
)
//var URL = "http://localhost:8181/api/v1/resources"
var URL = "http://httpbin.org/ip"
// Task interface
type Task interface {
Execute(p *Pool)
}
// JobDesc struct
type JobDesc struct {
Client *http.Client
}
// Pool struct
type Pool struct {
mu sync.Mutex
size int
tasks chan Task
kill chan struct{}
wg sync.WaitGroup
Results chan interface{}
}
// Methods for Pool struct.
func (p *Pool) worker() {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
return
}
task.Execute(p)
case <-p.kill:
return
}
}
}
// Resize methods update the number of goroutine in the pool, to execute the task.
func (p *Pool) Resize(n int) {
p.mu.Lock()
defer p.mu.Unlock()
for p.size < n {
p.size++
p.wg.Add(1)
go p.worker()
}
for p.size > n {
p.size--
p.kill <- struct{}{}
}
}
// CloseJobChannel method close the jobs channel,
// once all the jobs are sending to the channel.
func (p *Pool) CloseJobChannel() {
close(p.tasks)
}
// CloseResultChannel method close the results channel,
// once all the jobs are completed and results are collected.
func (p *Pool) CloseResultChannel() {
close(p.Results)
}
// Wait Method of Pool struct
func (p *Pool) Wait() {
p.wg.Wait()
}
// AddJob Method add task to the task channel
func (p *Pool) AddJob(task Task) {
p.tasks <- task
}
// NewPool function create pool of task base on the size
// provided.
func NewPool(size int) *Pool {
pool := &Pool{
tasks: make(chan Task, 10000),
kill: make(chan struct{}),
Results: make(chan interface{}, 10000),
}
pool.Resize(size)
return pool
}
// Execute to execute
func (job *JobDesc) Execute(p *Pool) {
res, err := job.Client.Get(URL)
if err != nil {
fmt.Println(err)
}
//body, _ := ioutil.ReadAll(res.Body)
fmt.Println(res.Body)
//defer res.Body.Close()
}
func main() {
pool := NewPool(10)
tr := &http.Transport{DisableKeepAlives: false}
client := &http.Client{Transport: tr}
for i := 1; i <= 50; i++ {
job := JobDesc{Client: client}
pool.AddJob(&job)
}
pool.CloseJobChannel()
pool.Wait()
pool.CloseResultChannel()
i := 1
for k := range pool.Results {
fmt.Printf("%d, %d", i, k)
i++
}
}