Waiting for all goroutines to finish and combining the results

1.6k views Asked by At

I have some hard times understanding goroutines, channels and all sync thing. I believe I understand the concepts, but I'm missing a couple of lines to connect all the information I have. Also, the majority of the examples feel too simple, so I can't get the proper grasp of what's actually happening.

I'm writing a simple analytics tool for websites. One of the features is to check whether all links on this website are reachable. Obviously, there are many links on each website, so it seems like a great goroutines candidate. The thing is that after scheduling all goroutines I'd need to get back all results in order to present them to the user all at once.

What I have so far is:

func links(u *url.URL, d *goquery.Document) (links []models.Link) {
    wg := sync.WaitGroup{}

    d.Find("a[href]").Each(func(index int, item *goquery.Selection) {
        go func() {
            wg.Add(1)
            href, _ := item.Attr("href")
            url, _ := url.Parse(href)
            var internal bool

            if url.Host == "" {
                url.Scheme = u.Scheme
                url.Host = u.Host
            }

            links = append(links, models.Link{
                URL:       url,
                Reachable: Reachable(url.String()),
            })

            wg.Done()
        }()
    })

    wg.Wait()

    return
}

func Reachable(u string) bool {
    res, err := http.Head(u)
    if err != nil {
        return false
    }

    return res.StatusCode == 200
}

My code seems to work, but I feel like I miss something (or at least that it could be better). I have a couple of concerns/questions:

  1. If the website would contain 1000 links I'd produce 1000 goroutines and I believe it's not so smart. Probably I'd need a worker pool or something similar, right?
  2. Is it possible to use channels only for this example? I don't know how many links goquery would find, so I can't easily range over elements sent to channel. Also, I can't easily send some done message to another channel, because I don't know when this Each will end. Every for range on this channel is blocking, so the app is being back to synchronous.
  3. I believe this is a common thing in applications that you start iterating over something and you'd want to do some async work on each iteration and gather all results when it's over. I can't grasp my mind around this concept. I can't come up with how to approach such a case.
1

There are 1 answers

9
maxm On BEST ANSWER

You could use a semaphore to constrain the concurrency. This still spawns "1000 goroutines", but ensures that only 5 http requests are in flight at a given time. You can change the value of maxParallel to increase or decrease the number of parallel requests.

func links(u *url.URL, d *goquery.Document) (links []models.Link) {
    wg := sync.WaitGroup{}
    linkChan := make(chan models.Link)
    doneChan := make(chan struct{})
    maxParallel := 5
    semaphore := make(chan struct{}, maxParallel)
    d.Find("a[href]").Each(func(index int, item *goquery.Selection) {
        wg.Add(1)
        go func() {
            semaphore <- struct{}{}
            href, _ := item.Attr("href")
            url, _ := url.Parse(href)

            if url.Host == "" {
                url.Scheme = u.Scheme
                url.Host = u.Host
            }
            linkChan <- models.Link{
                URL:       url,
                Reachable: Reachable(url.String()),
            }
            wg.Done()
            <-semaphore
        }()
    })
    go func() {
        wg.Wait()
        doneChan <- struct{}{}
    }()

    // Drain the channel
    for {
        select {
        case l := <-linkChan:
            links = append(links, l)
        case <-doneChan:
            return
        }
    }
    return
}

func Reachable(u string) bool {
    res, err := http.Head(u)
    if err != nil {
        return false
    }

    return res.StatusCode == 200
}