AWS Textract Asynchronous Queue Messaging

34 views Asked by At

I'm getting the following error when calling DeleteMessageCommand for my pdf processing.

error InvalidAddress: The address arn:aws:sns:us-west-1:**** is not valid for this endpoint.

Here's where the error is happening:

async function processDocumentWithTextract(
  documentName: string,
  sqsQueueUrl: string,
  snsTopicArn: string
) {
  let jobFound = false
  let succeeded = false
  let response

  response = await textract.send(
    new StartDocumentAnalysisCommand({
      DocumentLocation: {
        S3Object: { Bucket: env.BUCKET_NAME, Name: documentName },
      },
      NotificationChannel: { RoleArn: env.ROLE_ARN, SNSTopicArn: snsTopicArn },
      FeatureTypes: ["TABLES"],
    })
  )

  const startJobId = response.JobId
  console.log(`Started job with ID: ${startJobId}`)

  // Poll SQS queue for job completion message
  while (!jobFound) {
    const sqsReceivedResponse = await sqsClient.send(
      new ReceiveMessageCommand({
        QueueUrl: sqsQueueUrl,
        MaxNumberOfMessages: 10,
      })
    )

    console.log("sqsReceivedResponse", sqsReceivedResponse)

    if (
      sqsReceivedResponse.Messages &&
      sqsReceivedResponse.Messages.length > 0
    ) {
      for (const message of sqsReceivedResponse.Messages) {
        const notification = JSON.parse(message.Body as string)
        const textractMessage = JSON.parse(notification.Message)

        if (textractMessage.JobId === startJobId) {
          console.log("Matching job found:", textractMessage.JobId)
          jobFound = true
          succeeded = textractMessage.Status === "SUCCEEDED"
          if (succeeded) {
            console.log("Job processing succeeded.")
            console.log("queueUrl", sqsQueueUrl)
            await sqsClient.send(
              new DeleteMessageCommand({
                QueueUrl: sqsQueueUrl,
                ReceiptHandle: message.ReceiptHandle,
              })
            )
            break
          }
        } else {
          await sqsClient.send(  // ---------------> error here, sqsQueueUrl looks correct when logging
            new DeleteMessageCommand({
              QueueUrl: sqsQueueUrl,
              ReceiptHandle: message.ReceiptHandle,
            })
          )
        }
      }
    }

    if (!jobFound) {
      // Sleep for a short while before polling the queue again
      console.log("polling queue...")
      await new Promise((resolve) => setTimeout(resolve, 5000))
    }
  }

  if (succeeded) {
    // Retrieve the results from Textract
    return startJobId
  } else {
    throw new Error("Textract job did not complete successfully.")
  }
}

Any ideas why this would be happening?

0

There are 0 answers