SQL Message Broker leaving messages in Sending Queue

3.6k views Asked by At

We're putting messages into a SQL Server message queue, by a trigger on a table. (When a field is updated, we build some XML, and call the trigger below).

CREATE PROCEDURE [dbo].[up_CarePay_BrokerSendXml] 
    -- Add the parameters for the stored procedure here
    @Data VARCHAR(MAX) 

AS
BEGIN

    DECLARE @InitDlgHandle UNIQUEIDENTIFIER
    DECLARE @RequestMessage VARCHAR(1000) 
    BEGIN TRY
          BEGIN TRAN

                BEGIN DIALOG CONVERSATION @InitDlgHandle 
                FROM SERVICE [//IcmsCarePay/Service/Initiator]
                TO SERVICE N'//IcmsCarePay/Service/Target'
                ON CONTRACT [//IcmsCarePay/Contract]
                WITH ENCRYPTION = OFF;

                SEND ON CONVERSATION @InitDlgHandle
                MESSAGE TYPE [//IcmsCarePay/Message/Request] (@Data);

          COMMIT TRAN;
    END TRY
    BEGIN CATCH
          ROLLBACK TRAN;
          DECLARE @Message VARCHAR(MAX);
          SELECT @Message = ERROR_MESSAGE();
          PRINT @Message
    END CATCH;

END

This works. A message is placed in the queue.

The message is then sent to the receiving queue on the same server - different database. We then run a proc every minute, which grabs the message from the target queue, and processes it into a staging table for processing. The message is then out of the target queue, and this all works without error.

However...

When I check the initiaitor queue, where the message came from, it's filling up with message.

SELECT TOP 1000 *, casted_message_body = 
CASE message_type_name WHEN 'X' 
  THEN CAST(message_body AS NVARCHAR(MAX)) 
  ELSE message_body 
END 
FROM [ICMS].[dbo].[IcmsCarePayInitiatorQueue] WITH(NOLOCK)

I'd have thought that when the message went from the initiator, to the target, the initiator would disappear. But it seems to be filling up.

I note that the messages in the initiator have a 'message_type_id' of 2, a 'validation' of 'E' and message body and casted message body are NULL. There all have a message_type_name of 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'.

On the destination database side, here is the procedure used to get the messages from the queue:

CREATE PROCEDURE [dbo].[up_CarePayBrokerReceiveXml]   
AS
BEGIN  
  SET NOCOUNT ON;  

  DECLARE @XML XML, @Response XML = 'OK', @ConversationHandle UNIQUEIDENTIFIER, @message_type_name SYSNAME, @message_body VARBINARY(MAX), @source_table VARCHAR(100)
  DECLARE @Message VARCHAR(MAX), @Line INT, @Proc VARCHAR(MAX), @Exception VARCHAR(MAX)  

  WHILE ( 1 = 1 )
  BEGIN  
    -- Clear variables, as they may have been populated in previous loop.
    SET @message_type_name = NULL
    SET @message_body = NULL
    SET @ConversationHandle = NULL  
    SET @source_table = NULL

    BEGIN TRY 
      BEGIN TRAN

        WAITFOR (    -- Pop off a message at a time, and add to storage table.
           RECEIVE TOP (1) 
               @message_type_name = message_type_name  
             , @message_body = message_body  
             , @ConversationHandle = conversation_handle  
             , @source_table = CAST([message_body] AS XML).value('(/row/@SourceTable)[1]', 'varchar(50)')  
           FROM dbo.IcmsCarePayTargetQueue  
        ), TIMEOUT 3000;  

        IF @@ROWCOUNT = 0
        BEGIN  
          ROLLBACK  -- Complete the Transaction (Rollback, as opposeed to Commit, as there is nothing to commit).
          BREAK  
        END

        -- Code removed for example, but the fields are saved to a staging table in the database here...

         -- Respond to Initiator  
        SEND ON CONVERSATION @ConversationHandle MESSAGE TYPE [//IcmsCarePay/Message/Response](@Response);  
        END CONVERSATION @ConversationHandle;  

      COMMIT -- End of Transaction

    END TRY
    BEGIN CATCH
      -- End the conversation
      END CONVERSATION @ConversationHandle WITH CLEANUP  

      -- Get details about the issue.
      SELECT  @Exception = ERROR_MESSAGE(), @Line = ERROR_LINE(), @Proc = ERROR_PROCEDURE(), @Message = 'proc: ' + @Proc + '; line: ' + CAST(@Line AS VARCHAR) + '; msg: ' + @Exception  
      SELECT  @Message -- Displays on Concole when debugging.

      -- Log the issue to the Application Log.
      INSERT  INTO dbo.ApplicationLog
              ( LogDate ,
                Thread ,
                Level ,
                Logger ,
                Message ,
                Exception  
              )
      VALUES  ( GETDATE() , -- LogDate - datetime  
                'None' , -- Thread - varchar(255)  
                'FATAL' , -- Level - varchar(50)  
                '____up_CarePayBrokerReceiveXml' , -- Logger - varchar(255)  
                @Message , -- Message - varchar(4000)  
                @Exception  -- Exception - varchar(2000)  
              )  
      COMMIT -- We have stored the erronous message, and popped it off the queue. Commit these changes.
    END CATCH 
  END  -- end while  

END

Why are these messages staying there?

Details of a message that remain in the Initiator queue are:

Status: 1
Priority: 5
queuing_order: 395
mess_sequence_number: 0
service_name: //IcmsCarePay/Service/Initiator
service_contract_name: //IcmsCarePay/Contract
message_type_name: http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog
message_type_id: 2
validation: E
message_body: NULL
casted_message_body: NULL
1

There are 1 answers

10
Roger Wolf On BEST ANSWER

Looks like you use one-time dialogs for these conversations. Your target stored proc retrieves messages from target queue and then closes their dialogs, but you don't handle it on the initiator queue.

Since dialog is a distributed thing, in order to be closed, it has to be closed on both the initiator and the target sides. When your target proc issues end conversation @Handle; on the target, Service Broker sends the message of the type you mentioned to the initiator, to inform it that this particular dialog is history.

Being done properly, the initiator activation procedure will receive this message, issue the corresponding end conversation on its side, and the dialog is closed.

As you do not process any messages on the initiator side, these system messages accumulate there.

2 solutions here are possible:

  1. Handle EndDialog messages. This actually should be done on both sides, because dialog can be closed on its either side.
  2. Re-use dialogs, so that you don't have to create a new one each time you need to send something. It will save some significant resources, especially if the traffic is thick enough.

Note that #1 should be done regardless of whether you will use persistent or one-time dialogs.

EDIT: Here is an example of the default processing procedure, taken from one of my projects:

create procedure [dbo].[ssb_Queue_DefaultProcessor]
(
    @Handle uniqueidentifier,
    @MessageType sysname,
    @Body xml,
    @ProcId int
) with execute as owner as

set nocount, ansi_nulls, ansi_padding, ansi_warnings, concat_null_yields_null, quoted_identifier, arithabort on;
set numeric_roundabort, xact_abort, implicit_transactions off;

declare @Error int, @ErrorMessage nvarchar(2048);

declare @Action varchar(20);

begin try

-- System stuff
if @MessageType in (
    N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog',
    N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    ) begin

    -- Depending on the actual message, action type will be different
    if @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' begin
        set @Action = 'PURGE';
    end else if @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
        set @Action = 'CLOSE';

    -- Close the dialog
    exec dbo.ssb_DialogPools_Maintain @Action = @Action, @DialogHandle = @Handle, @Error = @Error output, @ErrorMessage = @ErrorMessage output;

    if nullif(@Error, 0) is not null
        throw 50000, @ErrorMessage, 1;

end else
    -- Some unknown messages may end up here, log them
    throw 50011, 'Unknown message type has been passed into default processor.', 1;

end try
begin catch

if nullif(@Error, 0) is null
    select @Error = error_number(), @ErrorMessage = error_message();

-- Don't try to resend messages from default processing
exec dbo.ssb_Poison_Log @ErrorNumber = @Error, @ErrorMessage = @ErrorMessage, @MessageType = @MessageType, @MessageBody = @Body, @ProcId = @ProcId;

end catch;
return;

It is called from all activation procs when they encounter any type of message other than what they are supposed to handle. Below is an example of one of such activation procedures:

create procedure [dbo].[ssb_QProcessor_Clients]
with execute as owner as


set nocount, ansi_nulls, ansi_padding, ansi_warnings, concat_null_yields_null, quoted_identifier, arithabort on;
set numeric_roundabort, xact_abort, implicit_transactions off;

declare @Handle uniqueidentifier, @MessageType sysname, @Body xml, @MessageTypeId int;
declare @Error int, @ErrorMessage nvarchar(2048), @ProcId int = @@procid;
declare @TS datetime2(4), @Diff int, @Delay datetime;


-- Fast entry check for queue contents
if not exists (select 0 from dbo.ssb_OY_Clients with (nolock))
    return;

while exists (select 0 from sys.service_queues where name = 'ssb_OY_Clients' and is_receive_enabled = 1) begin

    begin try
    begin tran;

    -- Receive something, if any
    waitfor (
        receive top (1) @Handle = conversation_handle,
            @MessageType = message_type_name,
            @Body = message_body
        from dbo.ssb_OY_Clients
    ), timeout 3000;

    if @Handle is null begin

        -- Empty, get out
        rollback;
        break;

    end;

    -- Check for allowed message type
    select @MessageTypeId = mt.Id
    from dbo.ExportMessageTypes mt
        inner join dbo.ExportSystems xs on xs.Id = mt.ExportSystemId
    where mt.MessageTypeName = @MessageType
        and xs.Name = N'AUDIT.OY.Clients';

    if @MessageTypeId is not null begin

        -- Store the data
        exec dbo.log_Clients @MessageType = @MessageType, @Body = @Body, @Error = @Error output, @ErrorMessage = @ErrorMessage output;

        -- Check the result
        if nullif(@Error, 0) is not null
            throw 50000, @ErrorMessage, 1;

    end else
        -- Put it into default processor
        exec dbo.ssb_Queue_DefaultProcessor @Handle = @Handle, @MessageType = @MessageType, @Body = @Body, @ProcId = @ProcId;

    commit;
    end try
    begin catch

    if nullif(@Error, 0) is null
        select @Error = error_number(), @ErrorMessage = error_message();

    -- Check commitability of the transaction
    if xact_state() = -1
        rollback;
    else if xact_state() = 1
        commit;

    -- Try to resend the message again
    exec dbo.[ssb_Poison_Retry] @MessageType = @MessageType, @MessageBody = @Body, @ProcId = @ProcId, @ErrorNumber = @Error, @ErrorMessage = @ErrorMessage;

    end catch;

    -- Reset dialog handle
    select @Handle = null, @Error = null, @ErrorMessage = null;

end;

-- Done!
return;

Of course, it's a bit more in this example than you might need, but I hope the general approach is apparent. And you need to handle EndDialog and Error message types on both initiator and target, because you never know where they will appear.