ForkJoinFramework only uses two workers

80 views Asked by At

I have an application which crawls around six thousand urls.To minimize this work i created a RecursiveTask which consumes a ConcurrentLinkedQueue of all URLs to crawl. It splits up to 50 off and if the que is empty it crawls it directly but if not it first creates a new instance of itself and forks it, after that it crawls the subset of 50 and after that it will join the forked task.

Now comes my problem, until each thread has worked of his 50 all four work quick anf at the same time. But after two stop working and waiting for join and only the other two are working and creating new forks and crawling pages.

To visualize this i count the number how mouch URLs a Thread crawls and let a JavaFX gui show it.

What do i wrong so the ForkJoinFramewok only uses two of my four allowed threads? What can i do to change it?

Here is my compute method of the task:

    LOG.debug(
       Thread.currentThread().getId() + " Starting new Task with " 
          + urlsToCrawl.size() + " left."
    );
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
    for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++)
    {
        urlsToCrawlSubset.offer(urlsToCrawl.poll());
    }
    LOG.debug(
       Thread.currentThread().getId() + " Crated a Subset with " 
       + urlsToCrawlSubset.size() + "."
    );
    LOG.debug(
       Thread.currentThread().getId() 
       + " Now the Urls to crawl only left " + urlsToCrawl.size() + "."
    );

    if (urlsToCrawl.isEmpty())
    {
        LOG.debug(Thread.currentThread().getId() + " Crawling the subset.");
        crawlPage(urlsToCrawlSubset);
    }
    else
    {
        LOG.debug(
           Thread.currentThread().getId() 
              + " Creating a new Task and crawling the subset."
        );
        final AbstractUrlTask<T, D> otherTask = createNewOwnInstance();
        otherTask.fork();
        crawlPage(urlsToCrawlSubset);
        taskResults.addAll(otherTask.join());
    }
    return taskResults;

And here is an snapshot of my diagram: enter image description here

P.s. If i allow up to 80 threads it will us them until every has 50 URLs crawled an then uses only two.

And if you're interested, here is the complete source code: https://github.com/mediathekview/MServer/tree/feature/cleanup

1

There are 1 answers

0
Nicklas2751 On BEST ANSWER

I fixed it. My error was, that i splitted then worked a small protion and than waited instead of split it into half, and then call my self again with the rest other half etc.

In other words before i splitted and worked directly but correct is to split till all is splitted and then start working.

Here is my code how it looks now:

@Override
protected Set<T> compute()
{
    if (urlsToCrawl.size() <= config.getMaximumUrlsPerTask())
    {
        crawlPage(urlsToCrawl);
    }
    else
    {
        final AbstractUrlTask<T, D> rightTask = createNewOwnInstance(createSubSet(urlsToCrawl));
        final AbstractUrlTask<T, D> leftTask = createNewOwnInstance(urlsToCrawl);
        leftTask.fork();
        taskResults.addAll(rightTask.compute());
        taskResults.addAll(leftTask.join());
    }
    return taskResults;
}

private ConcurrentLinkedQueue<D> createSubSet(final ConcurrentLinkedQueue<D> aBaseQueue)
{
    final int halfSize = aBaseQueue.size() / 2;
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
    for (int i = 0; i < halfSize; i++)
    {
        urlsToCrawlSubset.offer(aBaseQueue.poll());
    }
    return urlsToCrawlSubset;
}