Integrating Step Function and SQS with EventBridge Pipes Failed

254 views Asked by At

I need to integrate a SQS with my Step Function using EventBridge Pipes. The Pipe is composed of SQS fifo queue (Source) and Step Function (Target). Below is the Step Function definition:


  "Comment": "A description of my state machine",
  "StartAt": "Pass",
  "States": {
    "Pass": {
      "Type": "Pass",
      "Next": "List Executions",
      "Parameters": {
        "messageId.$": "$.[0].messageId",
        "receiptHandle.$": "$.[0].receiptHandle",
        "body.$": "$.[0].body",
        "attributes.$": "$.[0].attributes"
      }
    },
    "List Executions": {
      "Type": "Task",
      "Parameters": {
        "StateMachineArn.$": "$$.StateMachine.Id",
        "StatusFilter": "RUNNING"
      },
      "Resource": "arn:aws:states:::aws-sdk:sfn:listExecutions",
      "ResultSelector": {
        "StateMachineArn.$": "$$.StateMachine.Id",
        "uuid.$": "States.UUID()",
        "runningExecutionsCount.$": "States.ArrayLength($.Executions)"
      },
      "Next": "Receive Message",
      "ResultPath": "$.ExecutionDetails"
    },
    "Receive Message": {
      "Type": "Task",
      "Next": "Process Finished",
      "Parameters": {
        "MaxNumberOfMessages": 1,
        "QueueUrl": "https://sqs.us-east-1.amazonaws.com/xxx/test-sqs.fifo"
      },
      "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
      "ResultSelector": {
        "body.$": "States.StringToJson($.Messages[0].Body)",
        "ReceiptHandle.$": "$.Messages[0].ReceiptHandle"
      }
    },
    "Process Finished": {
      "Type": "Pass",
      "End": true
    }
  }
}

This Step Function is failed each time a simple message like {"message": "Hi there!"} is sent from SQS:

Exception:

An error occurred while executing the state 'ReceiveMessage' (entered at the event id #9). The function 'States.StringToJson($.Messages[0].Body)' had the following error: The JsonPath argument for the field '$.Messages[0].Body' could not be found in the input '{}'

It seems that the input message is sort of ignored.

1

There are 1 answers

2
Thomas On BEST ANSWER

Currently, the error is caused by the fact that you are using SQS to trigger your State Machine and then trying to fetch another message from the same queue (in your Receive Message state) that has no more messages in it.

If you remove the ResultSelector from the Receive Message state you'll see the output is

{
  "Messages": []
}

which does not contain anything since the message you sent has already been consumed by triggering your State Machine (which should now succeed).