Duplicate records get written to MongoDB after Hadoop MapReduce (using Mongo Hadoop Connector)

207 views Asked by At

Our Hadoop test environment on AWS EMR

  • 1 master node
  • 2 slave nodes

When we submit a small test job, it triggers 1 map task. Once the map task is complete, 3 reduce tasks are triggered.

When the reduce tasks are complete, our output data is written to the Mongo Collection. However we have noticed that on some occasions, there are duplicate records in the output. This is then causing our down-stream processing tasks to crash as they are not expecting duplicates.

One thing I have noticed is that one of the reduce tasks gets killed sometimes and then restarted by hadoop - could this cause the duplicate records if it gets killed in the middle of writing data to Mongo?

Is there any way to see from logs if the Mongo hadoop connector is actually writing data to Mongo?
Is there any way to ensure that all data is fully reduced before getting committed to Mongo so there are no duplicates?

We have not experienced this issue if we only have 1 master node and 1 slave node in the cluster. However this is obviously a major blocker for any attempt at scaling...


Update with resolution to issue

Based on @ruby's answer I created bootstrap actions to disable speculative execution on EMR.

Mongo have also recently released an updated version of the Mongo hadoop connector which adds support for Speculative Execution (1.4.0-rc0) https://github.com/mongodb/mongo-hadoop/releases

After upgrading to the latest jar file and adding the bootstrap actions, I found that the issue was still not totally resolved. After further investigation I found that the underlying issue was related to how the output of the combiner step was getting routed to the reducer tasks. We had not implemented a custom partitioner and so hadoop was using the hashCode() method of our Key entity. This was using the Java Objects.hash() method which should not be used on distributed systems as it does not return reliable hash values across separate Java instances.

We implemented our own custom Partitioner and this finally resolved the duplicates issue.

1

There are 1 answers

1
rbyndoor On BEST ANSWER

Turn off speculative execution by setting these properties in driver class or client side mapred-site.xml.

<property>
      <name>mapred.map.tasks.speculative.execution</name> 
      <value>false</value>
  </property>
   <property>
      <name>mapred.reduce.tasks.speculative.execution</name> 
      <value>false</value>
  </property>