Hi I am using Executors for loading some data in parallel. My application is fetching some data from DB which have parent-child relation like:
parent 1 -> [child11, child12,..., child1N]
parent 2 -> [child21, childy22,..., child2N]
.....
parent N -> [childN1, childyN2,..., childNN]
Now here I want parallel processing. What I am doing now is loading all data of a parent child set at a time from DB and calling executor service to map those in relationship and store in my data structure.
Now the code I have for this:
The Parent Child relation is like:
public class Post implements Serializable {
private static final long serialVersionUID = 1L;
private Integer postId;
private String postText;
private String postType;
private Integer menuItemId;
private boolean parentPost;
private Integer parentPostId;
// Contains all the Child of this Post
private List<Post> answers = new ArrayList<Post>();
....
//getters and setters
}
Now I have a wrapper for this Post class for synchronization
public class PostList {
private List<Post> postList;
public PostList() {
super();
this.postList = new ArrayList<Post>();
}
public List<Post> getPostList() {
return postList;
}
public synchronized boolean add(Post post) {
return postList.add(post);
}
public synchronized boolean addAnswer(Post answer) {
for(Post post : postList)
{
if(post.getPostId() == answer.getParentPostId())
{
post.getAnswers().add(answer);
break;
}
}
return true;
}
}
Now My Loading code from DB is:
/* This is called to load each parent-child set at a time, when the
first set is fetched from DB then call to executor to store those in
internal data structure. */
List<Post> posts = null;
PostList postList = null;
Integer args[] ={menuItemId};
// Fetch all Posts which are in parent child relation
posts = getDataFromDB(...)
if(posts != null && posts.size() >0)
{
postList = new PostList();
ExecutorService executor = Executors.newFixedThreadPool(10);
for(Post post : posts)
{
executor.execute(new PostProcessor(post, postList));
}
logger.debug("Starting executor shutdown...");
executor.shutdown();
while (!executor.isTerminated()) {
try {
executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
logger.error("Interrupted executor >>", ex.getMessage(), ex);
}
}
logger.debug("All post loading done ...");
logger.debug("PostList >> " + postList);
if(postList.getPostList() != null)
return postList.getPostList();
}
And in PostProcessor I have
public class PostProcessor implements Runnable {
private Post post;
private PostList postList;
public PostProcessor(Post post, PostList postList) {
super();
this.post = post;
this.postList = postList;
}
@Override
public void run() {
// TODO Auto-generated method stub
Post answer = null;
try
{
// if Post is parent / is a question
if ("Q".equalsIgnoreCase(post.getPostType()))
{
// do some operation
postList.add(post);
}
// Post is an Answer, so add the answer to proper Question
else {
answer = post;
postList.addAnswer(answer);
}
Thread.sleep(1000);
}
catch(Throwable throwable)
{
logger.error(throwable.getMessage(),throwable);
}
}
}
But it behaving abnormally, some time its loading all question post but not all answers and some times its not loading a parent post at all. Please help where I am doing wrong.
If
addAnswer
fails then it should return false or throw an exception; this indicates that the appropriate question has not yet been loaded or doesn't exist. Two options:question_count == 0
but without a question having been added to the list yet); if an answer fails to match up to a question andquestion_count > 0
then put the answer back on the queue, else throw an exception.More as a matter of efficiency than correctness, I suggest that you eliminate the
synchronized
methods and use thread-safe data structures from java.util.concurrent instead - this will reduce lock contention. This would look something like