I have a flow which submits around 4 salesforce bulk query job details ( all jobs are and will always be already in inCompletes status) to anypoint mq to be processed asynchronously. I am using a normal Queue, Not using a FIFO queue and want to process one message at a time for each worker. My application is deployed to 4 workers. I just wanted to have load balancing using anypoint MQ with 4 workers. Below is the xml main flow. Out of 4 messages one get stuck in flight mode and remaining three messages are being processed by 3 workers just fine.
I just later just check the status of the job and once confirmed that is already in inComplete job. I just send a manual ack. I see the same behaviour when i run this application in local machine
configuration
MuleRuntime: 4.4
MQ Connector Version: tried with 3.2.0 and 4.02
anypoint MQ configuration
anypoint-mq:config name="Anypoint_MQ_Config" doc:name="Anypoint MQ Config" doc:id="ce3aaed9-dcba-41bc-8c68-037c5b1420e2">
<anypoint-mq:connection clientId="${secure::anyPointMq.clientId}" clientSecret="${secure::anyPointMq.clientSecret}" url="${anyPointMq.url}">
<reconnection>
<reconnect frequency="3000" count="3" />
</reconnection>
<anypoint-mq:tcp-client-socket-properties connectionTimeout="30000" />
</anypoint-mq:connection>
</anypoint-mq:config>
flow code
<flow name="sfdc-bulk-query-job-subscription" doc:id="7e1e23d0-d7f1-45ed-a609-0fb35dd23e6a" maxConcurrency="${maxConcurrency.sfs3}">
<anypoint-mq:subscriber doc:name="polling every 10 seconds" doc:id="98b8b25e-3141-4bd7-a9ab-86548902196a" config-ref="Anypoint_MQ_Config" destination="${anyPointMq.name}" acknowledgementTimeout="10" acknowledgementMode="MANUAL" acknowledgementTimeoutUnit="MINUTES">
<anypoint-mq:subscriber-type >
<anypoint-mq:polling fetchSize="1" >
<scheduling-strategy >
<fixed-frequency frequency="10" timeUnit="SECONDS"/>
</scheduling-strategy>
</anypoint-mq:polling>
</anypoint-mq:subscriber-type>
</anypoint-mq:subscriber><json-logger:logger doc:name="INFO - Bulk Job Details have been fetched" doc:id="b25c3850-8185-42be-a293-659ebff546d7" config-ref="JSON_Logger_Config" message='#["Bulk Job Details have been fetched for " ++ payload.object default ""]'>
<json-logger:content ><![CDATA[#[output application/json ---
payload]]]></json-logger:content>
</json-logger:logger>
<set-variable value='#[message.attributes.properties.mode]' doc:name="mode" doc:id="995777f2-8ad5-431c-88cb-87f3368574ce" variableName="mode"/>
<set-variable value="#[attributes.ackToken]" doc:name="ackToken" doc:id="f78adfc0-30a6-443f-9e84-867dc9c47bbb" variableName="ackToken"/>
<set-variable value="#[payload]" doc:name="jobDetails" doc:id="7cae7d74-78e8-49b8-af60-40552ba1ae30" variableName="jobDetails"/>
<set-variable value="#[payload.id]" doc:name="jobId" doc:id="00dc5489-0ff7-47db-bb7a-173ea342d278" variableName="jobId"/>
<set-variable value="#[p('serviceName.sfdcPartnerToEds')]" doc:name="ServiceName" doc:id="f1ece944-0ed8-4c0e-94f2-3152956a2736" variableName="ServiceName"/>
<set-variable value="#[payload.object]" doc:name="sfObject" doc:id="2857c8d9-fe8d-46fa-8774-0eed91e3a3a6" variableName="sfObject" />
<set-variable value="#[message.attributes.properties.key]" doc:name="key" doc:id="57028932-04ab-44c0-bd15-befc850946ec" variableName="key" />
<try doc:name="Try" doc:id="0fafa0cd-3260-4d54-ad82-b830a5bc9e50" >
<flow-ref doc:name="bulk-job-status-check" doc:id="c6b9cd40-4674-47b8-afaa-0f789ccff657" name="bulk-job-status-check" />
<error-handler >
<on-error-propagate enableNotifications="true" logException="true" doc:name="On Error Propagate" doc:id="97187f3c-69be-4609-964c-52674215ce2d" >
<flow-ref doc:name="errorResponse" doc:id="2985af0c-e134-4fd9-a46a-a9f9dbf37628" name="errorResponse"/>
<json-logger:logger doc:name="Error - Error has occured while processing bulk job" doc:id="d74dc134-0e63-4b0d-8e6d-47825c5181f9" config-ref="JSON_Logger_Config" message='#["Error has occured while processing bulk job" ++ vars.jobId ++ ". Submitting job details to dl queue for manual reprocessing"]' tracePoint="EXCEPTION" priority="ERROR">
<json-logger:content><![CDATA[#[output application/json ---
{
}]]]></json-logger:content>
</json-logger:logger>
<anypoint-mq:publish doc:name="Publishing to sfdc-learner-eds-ingestion-dlq-dev" doc:id="170e324d-f1f1-4d2a-9759-c7b1d6ace0a6" config-ref="Anypoint_MQ_Config" destination="${anyPointMq.sfPartnerEds.dlQueueName}">
<anypoint-mq:body ><![CDATA[#[vars.jobDetails]]]></anypoint-mq:body>
<anypoint-mq:properties ><![CDATA[#[{
key: vars.key
}]]]></anypoint-mq:properties>
</anypoint-mq:publish>
</on-error-propagate>
</error-handler>
</try>
<json-logger:logger doc:name="INFO - subscribed bulk job id has been processed successfully" doc:id="7e469f92-2aff-4bf4-84d0-76577d44479a" config-ref="JSON_Logger_Config" message='#["subscribed bulk job id " ++ vars.jobId default "" ++ " has been processed successfully for salesforce " ++ vars.sfObject default "" ++ " object"]' tracePoint="END"/>
</flow>
status check flow with ack
<sub-flow name="bulk-job-status-check" doc:id="c7d8c732-dabe-4618-b308-5b652ac27494">
<try doc:name="Try" doc:id="4ff88a65-d44d-418f-a952-2f296bc509c5" >
<until-successful maxRetries="#[vars.munitTestRetry default p('retry.jobStatusRetries')]" doc:name="Until Bulk Job Status is JobCompleted or Failed" doc:id="393c51e8-9634-4126-bbe0-79a2bbef9304" millisBetweenRetries="${retry.jobStatusRetriesInternval}">
<flow-ref doc:name="execute-sfdc-query-job-status-check-request" doc:id="a341f2d8-55d3-403c-be4e-69ae31af5e00" name="execute-sfdc-query-job-status-check-request" />
<json-logger:logger doc:name="bulk job status" doc:id="8b1aceaf-9a68-40ec-93c1-a64038c076b5" config-ref="JSON_Logger_Config" message='#["query job status for " ++ vars.sfObject default "" ++ " bulk job id " ++ payload.id ++ " is " ++ payload.state]' tracePoint="AFTER_REQUEST">
<json-logger:content><![CDATA[#[output application/json ---
{}]]]></json-logger:content>
</json-logger:logger>
<validation:is-true doc:name="is salesforce job completed?" doc:id="5bac1fc1-5a07-40b1-92cb-d99eea98fd81" expression='#[(payload.state == "JobComplete" ) or (payload.state =="Failed")]' message='#["Bulk Job " ++ payload.id ++ " is still in progress and will be retried after few minutes"]' />
</until-successful>
<error-handler >
<on-error-propagate enableNotifications="true" logException="true" doc:name="On Error Propagate" doc:id="e6c2fe69-5721-4474-9787-0d7e6eb6c8cf" type="VALIDATION:INVALID_BOOLEAN">
<json-logger:logger doc:name="After 5 retries bulk job status is still in progress" doc:id="d77fd2ff-8c7e-418d-b061-456cbc72c029" config-ref="JSON_Logger_Config" message="After 5 retries bulk job status is still in progress, this will be retried again. Sending Nack to queue"/>
<anypoint-mq:nack doc:name="Nack" doc:id="9c71ddd2-0c4b-480e-a9a5-b86363ea82f4" config-ref="Anypoint_MQ_Config" ackToken="#[vars.ackToken]"/>
</on-error-propagate>
</error-handler>
</try>
<anypoint-mq:ack doc:name="Ack" doc:id="1d2479c9-a512-428a-b67f-1c27203d851c" config-ref="Anypoint_MQ_Config" ackToken="#[vars.ackToken]"/>
<choice doc:name="Failed?" doc:id="2374edfe-de02-49e9-839d-14727eacf20e" >
<when expression='#["JobComplete" == payload.state]'>
<flow-ref doc:name="get-result-and-write-file-to-s3" doc:id="23026cbb-be9b-41ab-a7fa-08cf0da9979d" name="get-result-and-write-file-to-s3" />
</when>
<otherwise >
<ee:transform doc:name="errorObj" doc:id="52d0629a-bc32-471b-adf6-20df1b6f8a82">
<ee:message>
</ee:message>
<ee:variables >
<ee:set-variable variableName="errorObj" ><![CDATA[%dw 2.0
output application/json
---
{
code: p('ERR.CODE'),
message: "Bulk Query Job with id " ++ payload.id default "" ++ " Failed to complete for sfdc object " ++ (vars.sfObject default "")
}]]></ee:set-variable>
</ee:variables>
</ee:transform>
<flow-ref doc:name="errorResponse" doc:id="f1436516-eac9-4e05-bf1c-e359a36aebd8" name="errorResponse" />
</otherwise>
</choice>
</sub-flow>
NEXT FLOW
<sub-flow name="get-result-and-write-file-to-s3" doc:id="8328b35e-a076-48a0-a5c4-55d62a0197ba" >
<flow-ref doc:name="execute-sfdc-get-query-job-result" doc:id="1a764a0b-7d13-4fe6-a46a-5bf56093218d" name="execute-sfdc-get-query-job-result"/>
<choice doc:name="Choice" doc:id="02189074-da6d-4622-949f-bc8f59db81c5" >
<when expression='#["batch" == vars.mode]'>
<flow-ref doc:name="sfdc-learner-services-to-eds-sync-batchFlow" doc:id="5c7746b4-9178-476d-af94-14a4253efa24" name="sfdc-to-eds-sync-batchFlow"/>
</when>
<otherwise >
<ee:transform doc:name="Transforming to CSV" doc:id="b5c26052-9c52-47f7-bb39-c31f9f52ec79">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/csv quoteValues = true, quote="\"", escape= "\"" , deferred=true
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<compression:compress doc:name="Compress to Gzip" doc:id="f4314e0c-72a9-48b2-a48c-3816e7903f41">
<compression:compressor>
<compression:gzip-compressor />
</compression:compressor>
</compression:compress>
<flow-ref doc:name="S3-connector-flow" doc:id="52f29aec-609d-468b-8daf-88faf1517beb" name="S3-connector-flow" />
</otherwise>
</choice>
</sub-flow>
GET RESULT FLOW
<sub-flow name="execute-sfdc-get-query-job-result" doc:id="c8602e65-753e-4607-ab54-2bed22986851">
<json-logger:logger doc:name="INFO - getting bulk query job results" doc:id="e38d9741-b404-40b7-91c8-5212b49eb61a" config-ref="JSON_Logger_Config" message='#["getting bulk query job results for " ++ vars.sfObject default "" ++ " for job id: " ++ payload.id default ""]' tracePoint="BEFORE_REQUEST" >
<json-logger:content ><![CDATA[#[output application/json --- {}]]]></json-logger:content>
</json-logger:logger>
<set-variable value='#["GET_JOB_RESULT"]' doc:name="operationName" doc:id="45ddd3b1-3f11-4dff-9bd8-5c27b2301f5a" variableName="operationName" />
<until-successful maxRetries="#[vars.munitTestRetry default p('retry.maxRetries')]" doc:name="Until Successful" doc:id="dffa3627-b4fb-4558-aa00-c36a3acb70b1" millisBetweenRetries="#[p('retry.millisecondsBetweenRetries')]">
<try doc:name="Try" doc:id="7df6dc7a-235e-4e74-9e50-1d80488e3882" >
<salesforce:get-query-job-results-bulk-api-v2 doc:name="Get query job results bulk api v 2" doc:id="455a0795-fae0-4ac7-b365-914e73a2edc3" config-ref="Salesforce_Config_Learners" id="#[payload.id]" maxRecordsPerPage="#[p('sf.getBulkResult.maxRecordPP') default 2000]">
<reconnect frequency="${sf.retryFrequency}" count="${sf.reconnectionAttempts}" blocking="false"/>
</salesforce:get-query-job-results-bulk-api-v2>
<error-handler >
<on-error-propagate enableNotifications="true" logException="true" doc:name="On Error Propagate" doc:id="80635adb-1838-4891-96e4-d97901423234" type="SALESFORCE:CONNECTIVITY, SALESFORCE:TIMEOUT" >
<json-logger:logger doc:name="ERRO - error description" doc:id="00b962e0-e9cc-4d42-aaab-9ce92a3e8d72" config-ref="JSON_Logger_Config" message='#[error.description default "Unexpected technical error"]' tracePoint="EXCEPTION" priority="ERROR"/>
</on-error-propagate>
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="4ec614fe-2333-4ceb-86d0-a4343f2a775a" type="ANY" >
<set-variable value='#[error.description default "Unexpected technical error"]' doc:name="errorDetails" doc:id="817e5ceb-40e6-4cb4-94e0-4b74abfbd833" variableName="errorDetails" />
<set-variable value="#[true]" doc:name="errorCheck" doc:id="8ba215b0-ec2a-4f70-ad3a-e86d2cc95bc4" variableName="errorCheck" />
</on-error-continue>
</error-handler>
</try>
</until-successful>
<choice doc:name="request succeeded?" doc:id="1dae0b55-52ce-41ea-b182-6d95db7e5db5">
<when expression="#[vars.errorCheck]">
<raise-error doc:name="Raise error" doc:id="873f5e4e-9c9a-4160-b71f-0e2386232b44" type="SALESFORCE:CUSTOM_ERROR_HANDLER" description="#[vars.errorDetails]" />
</when>
<otherwise>
<json-logger:logger doc:name="DEBUG - bulk query job result reponse" doc:id="cb5d6631-dc6d-490d-88f7-4c28ae808abc" config-ref="JSON_Logger_Config" message="bulk query job result reponse" tracePoint="AFTER_REQUEST" priority="DEBUG" category="${jsonLogger.logLevel.debugS}">
<json-logger:content><![CDATA[#[output application/json --- payload]]]></json-logger:content>
</json-logger:logger>
</otherwise>
</choice>
</sub-flow>
S3 call flow
<sub-flow name="S3-connector-flow" doc:id="c85e5d31-7bd6-4092-a958-ac472f8be1b8">
<json-logger:logger doc:name="INFO - calling aws conector" doc:id="c51f8751-8542-495d-82b0-51cb4887715b" config-ref="JSON_Logger_Config" message='#["Calling AWS conector to create file for " ++ vars.sfObject default ""]' tracePoint="BEFORE_REQUEST">
<json-logger:content><![CDATA[#[output application/json
---
{
"bucketName" : p('aws.bucket'),
"key" : vars.key
}]]]></json-logger:content>
</json-logger:logger>
<set-variable value='#["POST_TO_AWS_S3"]' doc:name="operationName" doc:id="2a85647c-970b-4a7f-a037-561e2278c788" variableName="operationName" />
<until-successful maxRetries="5" doc:name="Until Successful" doc:id="3d3be64a-80df-44d6-a73d-f0958940a155" >
<try doc:name="Try" doc:id="8d7e5930-3d94-4a21-b404-ffcee59bcfdc">
<s3:put-object doc:name="Put Object" doc:id="435b99af-fee7-4f57-a05a-00a4e05d66e5" config-ref="Coursera_S3_Configuration" bucketName="sourced-dev" key="#[vars.key]"/>
<error-handler>
<on-error-propagate enableNotifications="true" logException="true" doc:name="On Error Propagate" doc:id="d2b30077-e991-4bcb-92f8-878d89d97937" type="S3:CONNECTIVITY, S3:RETRY_EXHAUSTED, S3:SERVER_BUSY">
<json-logger:logger doc:name="ERROR - error occured while calling aws connector" doc:id="0ed855ea-da37-4be6-958c-0e96181ca8bb" config-ref="JSON_Logger_Config" message="error occured while calling aws connector" tracePoint="EXCEPTION" priority="ERROR">
<json-logger:content><![CDATA[#[output application/json ---
error.description]]]></json-logger:content>
</json-logger:logger>
</on-error-propagate>
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="247aad5c-a96b-4688-842c-4c043e0bcbbc">
<set-variable value='#[error.description default "Unexpected technical error"]' doc:name="errorDetails" doc:id="9ab69a52-993c-4329-9e8f-d433a6738438" variableName="errorDetails" />
<set-variable value="#[true]" doc:name="errorCheck" doc:id="409062b1-cc68-4b05-984c-2eef55c3ad37" variableName="errorCheck" />
</on-error-continue>
</error-handler>
</try>
</until-successful>
<choice doc:name="request succeeded?" doc:id="7829914a-0c22-4f4e-a3ea-05924216d981" >
<when expression="#[vars.errorCheck]" >
<flow-ref doc:name="errorResponse" doc:id="75864928-0558-489e-aadc-437ac828a6d1" name="errorResponse"/>
<json-logger:logger doc:name="ERROR - Error occured while calling aws system api" doc:id="d84c48b4-68bb-4f66-887a-4eb0b9da95e2" config-ref="JSON_Logger_Config" message='#["AWS System APIs gave error for " ++ vars.sfObject default "" ++ " for job id " ++ vars.jobId]' tracePoint="EXCEPTION">
<json-logger:content><![CDATA[#[output application/json ---
{}]]]></json-logger:content>
</json-logger:logger>
<raise-error doc:name="Raise error" doc:id="36ec0d2f-3825-40c8-a2da-5a5fcf578720" type="AWS:CUSTOM_ERROR_HANDLER" description="#[vars.errorDetails]" />
</when>
<otherwise >
<json-logger:logger doc:name="INFO - AWS System API Response" doc:id="48b46907-0ce2-4e7d-9970-3b727f7a5ee9" config-ref="JSON_Logger_Config" message='#["AWS System APIs response received for " ++ vars.sfObject default "" ++ " for job id " ++ vars.jobId]' tracePoint="AFTER_REQUEST" >
<json-logger:content ><![CDATA[#[output application/json ---
payload]]]></json-logger:content>
</json-logger:logger>
</otherwise>
</choice>
</sub-flow>

I had the same issue during testing. I realized that the Anypoint MQ puts the message back on the queue if an acknowledgment is not received before the acknowledgement timeout value. On each Anypoint MQ queue the default timeout is 2 minutes, and as I was running in debug mode, the processing was paused for several minutes. I removed the checkpoints in debugger and the message was ack-ed on the manual ack step successfully. Tested this multiple times and it worked per the ack timeout value.