I have mapreduce jobs failing big on Amazon EMR because if the first attempt fails to copy results to S3, the file (probably partial) will be created and subsequent reduce attempts will refuse write on a file that already exists.

The first attempt log:

014-11-30 06:56:19,774 INFO [main] com.amazonaws.latency: StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: null; Request ID: removed), S3 Extended Request ID: removed=], ServiceName=[Amazon S3], AWSErrorCode=[null], AWSRequestID=[removed], ServiceEndpoint=[https://devel.rui.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=0, ClientExecuteTime=[130.087], HttpRequestTime=[118.72], HttpClientReceiveResponseTime=[32.585], RequestSigningTime=[0.646], HttpClientSendRequestTime=[0.835], 
2014-11-30 06:56:19,803 INFO [main] com.amazonaws.latency: StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: null; Request ID: removed), S3 Extended Request ID: 1removed=], ServiceName=[Amazon S3], AWSErrorCode=[null], AWSRequestID=[removed], ServiceEndpoint=[https://removed.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[27.899], HttpRequestTime=[26.898], HttpClientReceiveResponseTime=[9.405], RequestSigningTime=[0.559], HttpClientSendRequestTime=[1.016], 
2014-11-30 06:56:19,939 INFO [main] com.amazonaws.latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[removed], ServiceEndpoint=[https://removedi.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[127.219], HttpRequestTime=[20.791], HttpClientReceiveResponseTime=[15.467], RequestSigningTime=[0.391], ResponseProcessingTime=[82.617], HttpClientSendRequestTime=[0.955], 
2014-11-30 06:56:19,999 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords

A retry attempt log (the all look the same):

RequestSigningTime=[0.663], ResponseProcessingTime=[12.466], HttpClientSendRequestTime=[0.832], 
2014-11-30 07:23:56,526 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : 

java.io.IOException: File already exists:s3n://removed/removed/part-r-00005.gz

    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:615)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:891)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:788)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:169)
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.<init>(ReduceTask.java:548)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:622)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

The funny thing is that if I open the partfiile0005.gz it has things inside and is the format that is supposed to be.

Any ideas, how to solve this (and how to do it): a) increase the deal with the latency (eg. increase the timeout) b) the retry to delete the existing file if already exists.

1

There are 1 answers

0
mattwise On

You can modify your job to write output to a temporary directory that is named with a jobId or timestamp for uniqueness, then when processing is complete move the contents to your desired output location. That way if something goes wrong while processing after having written partial output, your desired output directory isn't affected. This also means that you wont accidentally read that partial output from the failed job.