Executor framework abnormal behaviour

80 views Asked by At

Hi I am using Executors for loading some data in parallel. My application is fetching some data from DB which have parent-child relation like:

parent 1 -> [child11, child12,..., child1N]
parent 2 -> [child21, childy22,..., child2N]
.....
parent N -> [childN1, childyN2,..., childNN]

Now here I want parallel processing. What I am doing now is loading all data of a parent child set at a time from DB and calling executor service to map those in relationship and store in my data structure.

Now the code I have for this:

The Parent Child relation is like:

public class Post implements Serializable {

    private static final long serialVersionUID = 1L;

    private Integer postId;
    private String  postText;
    private String  postType;
    private Integer menuItemId;
    private boolean parentPost;
    private Integer parentPostId;
    // Contains all the Child of this Post
    private List<Post> answers = new ArrayList<Post>();
    ....
    //getters and setters
}

Now I have a wrapper for this Post class for synchronization

public class PostList {

    private List<Post> postList;

    public PostList() {
        super();
        this.postList = new ArrayList<Post>();
    }

    public List<Post> getPostList() {
        return postList;
    }

    public synchronized boolean add(Post post) {
        return postList.add(post);
    }

    public synchronized boolean addAnswer(Post answer) {
        for(Post post : postList)
        {
            if(post.getPostId() == answer.getParentPostId())
            {
                post.getAnswers().add(answer);
                break;
            }
        }
        return true;
    }
}

Now My Loading code from DB is:

/* This is called to load each parent-child set at a time, when the 
first set is fetched from DB then call to executor to store those in  
internal data structure. */

List<Post> posts = null;
PostList postList = null;
Integer args[] ={menuItemId};
// Fetch all Posts which are in parent child relation
posts = getDataFromDB(...)
if(posts != null && posts.size() >0)
{
    postList = new PostList();
    ExecutorService executor = Executors.newFixedThreadPool(10);
    for(Post post : posts)
    {
         executor.execute(new PostProcessor(post, postList));
    }
    logger.debug("Starting executor shutdown...");
    executor.shutdown();
    while (!executor.isTerminated()) {
        try {
            executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            logger.error("Interrupted executor >>", ex.getMessage(), ex);
        }
    }
    logger.debug("All post loading done ...");
    logger.debug("PostList >> " + postList);
    if(postList.getPostList() != null)
        return postList.getPostList();
}

And in PostProcessor I have

public class PostProcessor implements Runnable {

    private Post post;
    private PostList postList;


    public PostProcessor(Post post, PostList postList) {
        super();
        this.post = post;
        this.postList = postList;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        Post answer = null;
        try
        {
            // if Post is parent / is a question
            if ("Q".equalsIgnoreCase(post.getPostType())) 
            {
                // do some operation
                postList.add(post);
            }
            // Post is an Answer, so add the answer to proper Question
            else {
                answer = post;
                postList.addAnswer(answer);
            }
            Thread.sleep(1000);
        }
        catch(Throwable throwable)
        {
            logger.error(throwable.getMessage(),throwable);
        }
    }
}

But it behaving abnormally, some time its loading all question post but not all answers and some times its not loading a parent post at all. Please help where I am doing wrong.

1

There are 1 answers

0
Zim-Zam O'Pootertoot On BEST ANSWER

If addAnswer fails then it should return false or throw an exception; this indicates that the appropriate question has not yet been loaded or doesn't exist. Two options:

  1. Process all questions first, and throw an exception if an answer doesn't match a question.
  2. When you query the database, get a count of questions and decrement this every time a question is processed (do the decrement after you add the question to the post list, otherwise you might wind up with a question_count == 0 but without a question having been added to the list yet); if an answer fails to match up to a question and question_count > 0 then put the answer back on the queue, else throw an exception.

More as a matter of efficiency than correctness, I suggest that you eliminate the synchronized methods and use thread-safe data structures from java.util.concurrent instead - this will reduce lock contention. This would look something like

public class PostList {

    private AtomicInteger questionCount;
    private ConcurrentLinkedQueue<Post> questions;
    private ConcurrentHashMap<String, ConcurrentLinkedQueue<Post>> answers;

    public boolean addQuestion(Post post) {
        questions.offer(post);
        if(answers.putIfAbsent(post.getPostId(), new ConcurrentLinkedQueue<>()) 
             != null) {
            questionCount.decrementAndGet();
            return true;
        } else throw new IllegalArgumentException("duplicate question id");
    }

    public boolean addAnswer(Post answer) {
        ConcurrentLinkedQueue<Post> queue = answers.get(answer.getParentPostId());
        if(queue != null) {
          queue.offer(answer);
          return true;
        } else if(questionCount.get() > 0) {
          return false;
        } else {
          throw new IllegalArgumentException("answer has no question");
        }
    }
}