Service Broker performance using DialogPool - any way to improve?

60 views Asked by At

Years ago, we implemented Service Broker, and a year or so later added something like this (https://techcommunity.microsoft.com/t5/sql-server-blog/reusing-dialogs-with-a-dialog-pool/ba-p/383461) to improve performance. Now that volume has increased, (100+ sources calling the stored procedure, upwards of 1 million messages an hour) we are seeing where it will just slowly grind to a halt and the sql service needs to be restarted. If everything is working fine it can process 800k messages an hour, but during the slowdown it can drop down to 50k/hour.

The problem is somewhere in DialogPool and blocking/waits around accessing it, but I've forgotten enough about SB over the years that I no longer know where to start, to fix it (the "150 trick" comes to mind; we never implemented it, but its premise was to only use every 150th conversation, so that it would span pages; however, I don't remember where I'd do that in here, if you even would). The activated stored procedure is running 30 threads on a 32-core box.

I'm trying to figure out if there's a way to get this to not require multiple-times-a-day restarts. Microsoft seemed at a loss. My next step would be to change DialogPool into an in-memory table.

The initial stored procedure is this:

SET ANSI_NULLS ON
GO


CREATE  PROCEDURE [svcBroker].[BundleParse_SvcBroker_INS] (@Message XML)
AS
SET NOCOUNT ON

DECLARE @fromService sysname,
        @toService sysname,
        @onContract sysname,
        @messageType NVARCHAR(128),
        @messageBody NVARCHAR(MAX)

SET @fromService = 'DynamicParser\\my_InitiatorService'
SET @toService = 'DynamicParser\\my_TargetService'
SET @onContract = 'svcBroker_ods_claim_contract'
SET @messageType = 'svcBroker_ods_claim_request'
SET @messageBody = CONVERT(NVARCHAR(MAX), @message)

EXEC svcBroker.usp_send @fromService, @toService, @onContract, @messageType, @messageBody

GO

The dialogpool table:

(
[FromService] [sys].[sysname] NOT NULL,
[ToService] [sys].[sysname] NOT NULL,
[OnContract] [sys].[sysname] NOT NULL,
[Handle] [uniqueidentifier] NOT NULL,
[OwnerSPID] [int] NOT NULL,
[CreationTime] [datetime] NOT NULL,
[SendCount] [bigint] NOT NULL
) ON [PRIMARY]
GO
ALTER TABLE [svcBroker].[DialogPool] ADD CONSTRAINT [UQ__DialogPo__FE5BB31A4CC05EF3] UNIQUE NONCLUSTERED ([Handle]) ON [PRIMARY]
GO

usp_send:

SET QUOTED_IDENTIFIER ON
SET ANSI_NULLS ON
GO

CREATE PROCEDURE [svcBroker].[usp_send] (
      @fromService SYSNAME,
      @toService SYSNAME,
      @onContract SYSNAME,
      @messageType SYSNAME,
      @messageBody NVARCHAR(MAX))
AS
BEGIN
    SET NOCOUNT ON;
    DECLARE @dialogHandle UNIQUEIDENTIFIER;
    DECLARE @sendCount BIGINT;     
    DECLARE @counter INT;
    DECLARE @error INT;
 
    SELECT @counter = 1;

--select 'svcBroker.usp_send ', @fromService, @toService, @onContract, @messageType, @messageBody

    BEGIN TRANSACTION;
        -- Will need a loop to retry in case the dialog is
        -- in a state that does not allow transmission
        --
        WHILE (1=1)
            BEGIN
                -- Claim a dialog from the dialog pool.
                -- A new one will be created if none are available.
                --
                EXEC svcBroker.usp_get_dialog @fromService, @toService, @onContract, @dialogHandle OUTPUT, @sendCount OUTPUT;
 
                -- Attempt to SEND on the dialog
                --
                IF (@messageBody IS NOT NULL)
                BEGIN
                    -- If the @messageBody is not null it must be sent explicitly
                    SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType (@messageBody);
                END
                ELSE
                BEGIN
                    -- Messages with no body must *not* specify the body,
                    -- cannot send a NULL value argument
                    SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType;
                END
                 
                SELECT @error = @@ERROR;
                
                IF @error = 0
                BEGIN
                    -- Successful send, increment count and exit the loop --
                    SET @sendCount = @sendCount + 1;
                    BREAK;
                END
           
                SELECT @counter = @counter+1;
                
                IF @counter > 10
                BEGIN
                    -- We failed 10 times in a  row, something must be broken --
                    RAISERROR('Failed to SEND on a conversation for more than 10 times. Error %i.', 16, 1, @error) WITH LOG;
                    BREAK;
                END
 
                -- Delete the associated dialog from the table and try again --
                EXEC svcBroker.usp_delete_dialog @dialogHandle;
    
                SELECT @dialogHandle = NULL;
        END
 
        -- "Criterion" for dialog pool removal is send count > 1000.
        -- Modify to suit application.
        -- When deleting also inform the target to end the dialog.
        IF @sendCount > 1000
        BEGIN
            EXEC svcBroker.usp_delete_dialog @dialogHandle ;
            SEND ON CONVERSATION @dialogHandle MESSAGE TYPE [svcBroker_EndOfStream];
        END
        ELSE
        BEGIN
            -- Free the dialog.
            EXEC svcBroker.usp_free_dialog @dialogHandle, @sendCount;
        END
    COMMIT
END;

GO

our usp_get_dialog is:

SET ANSI_NULLS ON
GO

CREATE PROCEDURE [svcBroker].[usp_get_dialog] (
      @fromService SYSNAME,
      @toService SYSNAME,
      @onContract SYSNAME,
      @dialogHandle UNIQUEIDENTIFIER OUTPUT,
      @sendCount BIGINT OUTPUT)
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @dialog TABLE (
            FromService SYSNAME NOT NULL,
            ToService SYSNAME NOT NULL,
            OnContract SYSNAME NOT NULL,
            Handle UNIQUEIDENTIFIER NOT NULL,
            OwnerSPID INT NOT NULL,
            CreationTime DATETIME NOT NULL,
            SendCount BIGINT NOT NULL);
 
    -- Try to claim an unused dialog in [DialogPool]
    -- READPAST option avoids blocking on locked dialogs.
    SET TRANSACTION ISOLATION LEVEL READ COMMITTED
    
    BEGIN TRANSACTION;
        DELETE @dialog;

        UPDATE  TOP(1) svcBroker.DialogPool WITH(READPAST)
        SET     OwnerSPID = @@SPID
        OUTPUT INSERTED.* INTO @dialog 
        WHERE   FromService = @fromService
        AND     ToService = @toService
        AND     OnContract = @OnContract
        AND     OwnerSPID = -1;
        
        IF @@ROWCOUNT > 0
        BEGIN
            SET @dialogHandle = (SELECT Handle FROM @dialog);
            SET @sendCount = (SELECT SendCount FROM @dialog);          
        END
        ELSE
        BEGIN
            -- No free dialogs: need to create a new one
            BEGIN DIALOG CONVERSATION @dialogHandle
            FROM SERVICE @fromService
            TO SERVICE @toService
            ON CONTRACT @onContract
            WITH ENCRYPTION = OFF;
            
            INSERT  INTO svcBroker.DialogPool (
                    FromService, ToService, OnContract, Handle, OwnerSPID, CreationTime, SendCount)
            VALUES (@fromService, @toService, @onContract, @dialogHandle, @@SPID, GETDATE(), 0);

            SET @sendCount = 0;
        END
    COMMIT
END;
GO

and usp_delete_dialog looks like this

        @dialogHandle UNIQUEIDENTIFIER)
AS

BEGIN
      SET NOCOUNT ON;

      BEGIN TRANSACTION;

      DELETE svcBroker.DialogPool WHERE Handle = @dialogHandle;

      COMMIT
END;

GO

and finally, our activated stored procedure:

SET QUOTED_IDENTIFIER ON
SET ANSI_NULLS ON
GO

CREATE PROCEDURE [svcBroker].[ODS_TargetQueue_Receive]
AS

BEGIN
    set nocount on

    -- Variable table for received messages.
    DECLARE @receive_table TABLE(
            queuing_order BIGINT,
            conversation_handle UNIQUEIDENTIFIER,
            message_type_name SYSNAME,
            message_body xml);
  
    -- Cursor for received message table.

    DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY
            FOR SELECT
            conversation_handle,
            message_type_name,
            message_body
            FROM @receive_table ORDER BY queuing_order;
 
     DECLARE @conversation_handle UNIQUEIDENTIFIER;
     DECLARE @message_type SYSNAME;
     DECLARE @message_body xml;
 
     -- Error variables.
     DECLARE @error_number INT;
     DECLARE @error_message VARCHAR(4000);
     DECLARE @error_severity INT;
     DECLARE @error_state INT;
     DECLARE @error_procedure SYSNAME;
     DECLARE @error_line INT;
     DECLARE @error_dialog VARCHAR(50);

     BEGIN TRY
       WHILE (1 = 1)
       BEGIN
         BEGIN TRANSACTION;

         -- Receive all available messages into the table.
         -- Wait 5 seconds for messages.

         WAITFOR (
            RECEIVE TOP (1000)
               [queuing_order],
               [conversation_handle],
               [message_type_name],
               convert(xml, [message_body])
            FROM svcBroker.ODS_TargetQueue
            INTO @receive_table
         ), TIMEOUT 2000;

         IF @@ROWCOUNT = 0
         BEGIN

              COMMIT;
              BREAK;
         END
         ELSE
         BEGIN
              OPEN message_cursor;
              WHILE (1=1)
              BEGIN
                  FETCH NEXT FROM message_cursor
                            INTO @conversation_handle,
                                 @message_type,
                                 @message_body;
       
                  IF (@@FETCH_STATUS != 0) BREAK;
 
                  -- Process a message.
                  -- If an exception occurs, catch and attempt to recover.

                  BEGIN TRY
 
                      IF @message_type = 'svcBroker_ods_claim_request'
                      BEGIN
                          exec ParseMessages @message_body
                      END
                      ELSE IF @message_type in ('svcBroker_EndOfStream', 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
                      BEGIN
                          -- initiator is signaling end of message stream: end the dialog
                          END CONVERSATION @conversation_handle;
                      END
                      ELSE IF @message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
                      BEGIN
                           -- If the message_type indicates that the message is an error,
                           -- raise the error and end the conversation.
                           WITH XMLNAMESPACES ('http://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb)
                           SELECT
                           @error_number = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Code)[1]', 'INT'),
                           @error_message = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Description)[1]', 'VARCHAR(4000)');
                           SET @error_dialog = CAST(@conversation_handle AS VARCHAR(50));
                           RAISERROR('Error in dialog %s: %s (%i)', 16, 1, @error_dialog, @error_message, @error_number);
                           END CONVERSATION @conversation_handle;
                      END
                  END TRY
                  BEGIN CATCH
                     SET @error_number = ERROR_NUMBER();
                     SET @error_message = ERROR_MESSAGE();
                     SET @error_severity = ERROR_SEVERITY();
                     SET @error_state = ERROR_STATE();

                     SET @error_procedure = ERROR_PROCEDURE();

                     SET @error_line = ERROR_LINE();

                     IF XACT_STATE() = -1
                     BEGIN
                          -- The transaction is doomed. Only rollback possible.
                          -- This could disable the queue if done 5 times consecutively!
                          ROLLBACK TRANSACTION;
             
                          -- Record the error.
                          BEGIN TRANSACTION;
                            INSERT  INTO svcBroker.target_processing_errors (
                                    error_conversation,[error_number],[error_message],[error_severity],
                                    [error_state],[error_procedure],[error_line],[doomed_transaction],
                                    [message_body])
                            VALUES  (NULL, @error_number, @error_message,@error_severity,
                                    @error_state, @error_procedure, @error_line, 1, @message_body);
                          COMMIT;
 
                          -- For this level of error, it is best to exit the proc
                          -- and give the queue monitor control.
                          -- Breaking to the outer catch will accomplish this.
                          RAISERROR ('Message processing error', 16, 1);
                     END
                     ELSE IF XACT_STATE() = 1
                     BEGIN
                          -- Record error and continue processing messages.
                          -- Failing message could also be put aside for later processing here.
                          -- Otherwise it will be discarded.
                            INSERT  INTO svcBroker.target_processing_errors (
                                    error_conversation,[error_number],[error_message],[error_severity],
                                    [error_state],[error_procedure],[error_line],[doomed_transaction],
                                    [message_body])
                            VALUES  (NULL, @error_number, @error_message,@error_severity,
                                    @error_state, @error_procedure, @error_line, 0, @message_body);
                     END
                  END CATCH
              END
              CLOSE message_cursor;
              DELETE @receive_table;
         END
         COMMIT;
       END
     END TRY
     BEGIN CATCH
         -- Process the error and exit the proc to give the queue monitor control
         SET @error_number = ERROR_NUMBER();
         SET @error_message = ERROR_MESSAGE();
         SET @error_severity = ERROR_SEVERITY();
         SET @error_state = ERROR_STATE();
         SET @error_procedure = ERROR_PROCEDURE();
         SET @error_line = ERROR_LINE();
 
         IF XACT_STATE() = -1
         BEGIN
              -- The transaction is doomed. Only rollback possible.
              -- This could disable the queue if done 5 times consecutively!

              ROLLBACK TRANSACTION;

              -- Record the error.
              BEGIN TRANSACTION;
              INSERT INTO svcBroker.target_processing_errors (
                    error_conversation,[error_number],[error_message],[error_severity],
                    [error_state],[error_procedure],[error_line],[doomed_transaction],
                    [message_body])
              VALUES(NULL, @error_number, @error_message,@error_severity, @error_state, @error_procedure, @error_line, 1, @message_body);
              COMMIT;
         END
         ELSE IF XACT_STATE() = 1
         BEGIN
              -- Record error and commit transaction.
              -- Here you could also save anything else you want before exiting.
              INSERT INTO svcBroker.target_processing_errors (
                    error_conversation,[error_number],[error_message],[error_severity],
                    [error_state],[error_procedure],[error_line],[doomed_transaction],
                    [message_body])
              VALUES(NULL, @error_number, @error_message, @error_severity, @error_state, @error_procedure, @error_line, 0, @message_body);
              COMMIT;
         END
    END CATCH
END;

GO

The stored procedure ParseMessages is purely doing work on the pulled message - it shouldn't have any Service Broker code.

0

There are 0 answers