AWS SQS Java. Not all messages are retrieved from the SQS queue

16.2k views Asked by At

I have been trying several approaches to retrieve all messages from the SQS queue by using AWS SDK for Java to no avail. I have read about the distributed nature of the AWS SQS and that messages are stored on the different servers. But what I do not understand is why this architecture is not hidden from the end user. What tricks do I have to apply in Java code to retrieve all messages and be 100% sure that no one was missed?

I tried this with the "Long Polling":

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();

And this with Request Batching / Client-Side Buffering:

    // Create the basic Amazon SQS async client
    AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();

    // Create the buffered client
    AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);

    CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");

    CreateQueueResult res = bufferedSqs.createQueue(createRequest);

    SendMessageRequest request = new SendMessageRequest();
    String body = "test message_" + System.currentTimeMillis();
    request.setMessageBody( body );
    request.setQueueUrl(res.getQueueUrl());

    SendMessageResult sendResult = bufferedSqs.sendMessage(request);

    ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
    .withMaxNumberOfMessages(10)
    .withQueueUrl(queueUrl);
    ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);

    List<Message> messages = rx.getMessages();
    for (Message message : messages) {
    System.out.println(" Message");
    System.out.println(" MessageId: " + message.getMessageId());
    System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
    System.out.println(" MD5OfBody: " + message.getMD5OfBody());
    System.out.println(" Body: " + message.getBody());
    for (Entry<String, String> entry : message.getAttributes().entrySet()) {
    System.out.println(" Attribute");
    System.out.println(" Name: " + entry.getKey());
    System.out.println(" Value: " + entry.getValue());
    }
    }

But I am still unable to retrieve all messages.

Any idea?

AWS Forum keeps silence on my post.

4

There are 4 answers

7
Matt Houser On BEST ANSWER

When receiving messages from an SQS queue, you need to repeatedly call sqs:ReceiveMessage.

On each call to sqs:ReceiveMessage, you will get 0 or more messages from the queue which you'll need to iterate through. For each message, you'll also need to call sqs:DeleteMessage to remove the message from the queue when you're done processing each message.

Add a loop around your "Long Polling" sample above to receive all messages.

for (;;) {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    for (Message message : messages) {
        System.out.println(" Message");
        System.out.println(" MessageId: " + message.getMessageId());
        System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
        System.out.println(" MD5OfBody: " + message.getMD5OfBody());
        System.out.println(" Body: " + message.getBody());
        for (Entry<String, String> entry : message.getAttributes().entrySet()) {
            System.out.println(" Attribute");
            System.out.println(" Name: " + entry.getKey());
            System.out.println(" Value: " + entry.getValue());
        }
    }
    System.out.println();
}

Also note that you may receive the same message more than once. So allow your work to "reprocess" the same message, or detect a repeated message.

1
Lubo Sach On

Long polling will wait if there is no message in Queue. This means that if you call ReceiveMessage with long polling in loop you are guaranteed that you will get all messages. When there is 0 messages received in response, you've already received all messages.

You mentioned that you used also web console. Web console works in same way as calling API with SDK. This means that when you receive and see messages in console, messages are invisible to other clients until visibility timeout expires. That's probably reason why you don't see messages.

See more information about visibility timeout:

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html

1
Shally Dhar On

I too was facing same issue - only one message was getting returned , then i tried receiveMessageRequest.setMaxNumberOfMessages(10) , which would help me in retrieving 10 messages in a loop,

since my queue has >500 records what i did was

    List<String> messagelist = new ArrayList<>();

    try
    {
        AmazonSQS sqs = new AmazonSQSClient(credentials);
        Region usWest2 = Region.getRegion(Regions.US_WEST_2);
        sqs.setRegion(usWest2);
        boolean flag = true;

        while(flag)
        {
            ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queuename);
            receiveMessageRequest.setMaxNumberOfMessages(number_of_message_);
            receiveMessageRequest.withMaxNumberOfMessages(number_of_message_).withWaitTimeSeconds(wait_time_second_);
            List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();

            for (Message message : messages) 
                  {
                    //   System.out.println("    Body:          " + message.getBody());
                       messagelist.add( message.getBody());

                       String messageReceiptHandle = message.getReceiptHandle();
                       sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queuename).withReceiptHandle(messageReceiptHandle));
                    }
            if(messages.size()==0)
            {
                flag = false;
            }
        }         

    }
    catch (AmazonServiceException ase) {
        ase.printStackTrace();
    } catch (AmazonClientException ace) {
       ace.printStackTrace();
    }
    finally {
        return messagelist ;
    }

I am reading records from SQS then saving it into a String list and then deletion the record from queue.

so in the end i will have all the data from the queue in a list

0
E.J. Brennan On

An SQS queue is not a database. You can't read all the messages into a list like you are trying to do. There is no beginning and no end to the queue. You poll the queue and ask for some messages, it returns you some messages if they exist.

If you want a method that can return the entire dataset, then sqs is not the right tool - a traditional database might be better in that case.