My Adventures in Coding

October 23, 2007

SQL Server: Handling Failures Gracefully in Service Broker Queues

Filed under: SQL Server 2005 — Brian @ 8:23 am
Tags: ,

In my previous post I provided an example for setting up a Service Broker Queue. One of the issues you will most likely encounter soon after using Service Broker Queues is a message that fails when being processed. With Service Broker Queues it is strictly enforced that all requests in the queue be processed in order. The problem is that when an error occurs, it stops the queue and prevents any other messages from being processed until someone intervenes and fixes the the bad message (Usually a poor systems administrator who is called in the middle of the night!).

What if you are using the queue to process messages where order is not that important? If a message fails you would rather that the message be skipped and placed on a dead letter queue and not interrupt processing of the rest of the messages on the queue. Unfortunately service broker does not offer this feature, unlike many other queuing systems which do.

The following is an example of how to implement a simple dead letter queue for your Service Broker Queue. Note: Many items, such as the schema and message type will all be the same as the ones used by your main queue because both queues will be handling the same messages.

STEP 1: Create a RECEIVE sproc for clearing the queue

This sproc will be used to empty the dead letter queue when you want to reprocess any messages sitting in the queue. This may seem like a lot of SQL for this task, but it will make reprocessing entries in the queue later a simple one line of SQL.

CREATE PROCEDURE [dbo].[fooDeadLetterReceive]
AS
SET NOCOUNT ON;
DECLARE @dh UNIQUEIDENTIFIER;
DECLARE @mt SYSNAME;
DECLARE @MessageBody XML;

BEGIN TRANSACTION;
WAITFOR (RECEIVE TOP (1) @dh = conversation_handle,
	@mt = message_type_name,
	@MessageBody = message_body
	FROM [dbo].[fooDeadLetterQueue]), TIMEOUT 1000;
WHILE (@dh IS NOT NULL)
BEGIN
	BEGIN TRY
		
		--Send message back to the main queue to be processed
		DECLARE @handle uniqueidentifier;

		IF (convert(varchar, @MessageBody.query('count(/Root/*)')) != 0)
		BEGIN
			BEGIN DIALOG CONVERSATION	@handle
			FROM SERVICE	fooService
			TO SERVICE	'fooService'
			ON CONTRACT	fooMessageContract
			WITH ENCRYPTION = OFF;		SEND
			ON CONVERSATION	@handle
			MESSAGE TYPE	fooMessage (@MessageBody);
			END CONVERSATION @handle;
		END
		
		END CONVERSATION @dh;

		COMMIT;

	END TRY
	BEGIN CATCH
			-- If there are any exceptions then end conversation,
			-- rollback the transaction and stop processing the queue
			END CONVERSATION @dh;
			ROLLBACK TRANSACTION;
			ALTER QUEUE [dbo].[fooDeadLetterQueue] WITH ACTIVATION ( STATUS = OFF );
			BEGIN TRANSACTION
			BREAK
	END CATCH	-- Try to loop once more if there are more messages
	SELECT @dh = NULL;
	BEGIN TRANSACTION;
	WAITFOR (RECEIVE TOP (1) @dh = conversation_handle,
		@mt = message_type_name,
		@MessageBody = message_body
		FROM [dbo].[fooDeadLetterQueue]), TIMEOUT 1000;
END
COMMIT;

--Shutdown processing of the deadletter queue when we have gone through
--all existing messages in the queue
ALTER QUEUE [dbo].[fooDeadLetterQueue] WITH ACTIVATION ( STATUS = OFF );
GO

STEP 2: Create a SEND sproc for inserting messages into the queue

This sproc will be used later to insert messages into the dead letter queue when the main queue runs into a problem processing a message (Called from the CATCH block of the main queue) .

CREATE PROCEDURE [dbo].[fooDeadLetterSend]
	@body XML
AS
DECLARE @handle uniqueidentifier;

IF (convert(varchar, @body.query('count(/Root/*)')) != 0)
BEGIN
	BEGIN DIALOG CONVERSATION	@handle
	FROM SERVICE	fooDeadLetterService
	TO SERVICE	'fooDeadLetterService'
	ON CONTRACT	fooMessageContract
	WITH ENCRYPTION = OFF;		
	SEND
	ON CONVERSATION	@handle
	MESSAGE TYPE	fooMessage (@body);
	END CONVERSATION @handle;
END

STEP 3: Create a Dead Letter queue

Nothing special here, just create a new queue, called fooDeadLetterQueue. Also we attach the activation sproc that will be used for reprocessing of failed messages.

CREATE QUEUE [dbo].[fooDeadLetterQueue] WITH STATUS = ON , RETENTION = OFF , 
ACTIVATION (  STATUS = OFF , PROCEDURE_NAME = [dbo].[fooDeadLetterReceive] , 
MAX_QUEUE_READERS = 1 , EXECUTE AS OWNER  ) ON [PRIMARY]

Also note that activation on the queue (STATUS=OFF) is currently disabled. This is done on purpose. On a normal queue we always enable activation so that any time a message is received by the queue, it will be immediately processed. In the case of a dead letter queue we do not want messages processed until the messages have been checked/fixed.

STEP 4: Update the Catch block in the receive sproc of your main queue

Normally in the catch block of your TRY CATCH in a service broker queue, if an exception is thrown while processing the message, the catch block is just used as a place to log the error and then turn activation off on the queue so no more messages will be processed.

With your new dead letter queue, rather than turning off the main queue, instead we take the message that failed and send it to the dead letter queue. Now the problem message has been removed from the main queue and we can continue on with processing the rest of the messages in our queue. Now those Systems Administrators can finally relax.

BEGIN CATCH
	-- If there are any exceptions then end conversation rollback the transaction
	END CONVERSATION @dh;
	ROLLBACK TRANSACTION;
	
	--Send problem message to the dead letter queue
	exec dbo.fooDeadLetterSend @MessageBody

	BEGIN TRANSACTION
END CATCH	-- Try to loop once more if there are more messages

STEP 5: Reprocessing messages from the dead letter queue

To reprocess the queue you simply have to copy the messages in the dead letter queue back to the main queue. Now that RECEIVE sproc we wrote earlier comes in handy. Simply turn activation on for the dead letter queue. This will take all existing messages on the queue and push them back to the main queue for processing. When the dead letter queue is empty it shuts itself off again.

ALTER QUEUE dbo.fooDeadLetterQueue
WITH STATUS = ON , RETENTION = OFF ,
ACTIVATION ( STATUS = ON )
Advertisements

October 15, 2007

Service Broker Tutorial – SQL Server 2005

Filed under: SQL Server 2005 — Brian @ 5:46 pm
Tags: ,

The following is meant as a “Quick Start” tutorial for anyone needing to create a service broker queue under extreme time pressure! This is a generalized example of a db script for a new use of service broker queue called fooQueue. Replace attributes foo1, foo2 with whatever attributes (and types) you need in your message. Replace foo with a relevant name, e.g. CustomerOrders.  Replace schemaName with dbo or with whatever schema name you are using.  (I have experienced tricky permissions issues in the past with schema names other than dbo.)

NOTE: During our use of service broker, we ran into a number of “Gotchas”. The bonus of these examples is all of those fixes are built into the example. So if you need a high performance service broker queue where all of the problems have been ironed out, then feel free to cut and past this example.

STEP 1: Create the message schema

This schema defines the XML messages that can be placed onto the service broker queue.

CREATE XML SCHEMA COLLECTION [schemaName].[fooMessageXmlSchema] 
AS N'<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
      <xsd:complexType name="foo">
            <xsd:attribute name="foo1" type="xsd:integer" />
            <xsd:attribute name="foo2" type="xsd:integer" />
      </xsd:complexType>
      <xsd:complexType name="Root">
            <xsd:sequence>
                 <xsd:element name="foo" type="foo" minOccurs="0" maxOccurs="unbounded"/>
            </xsd:sequence>
      </xsd:complexType>
      <xsd:element name="Root" type="Root" />
</xsd:schema>'

STEP 2: Create the Send Request Stored Procedure

This procedure is responsible for taking a properly formatted message and sending it to the service broker queue.

CREATE PROCEDURE [schemaName].[fooSend]
      @body XML (DOCUMENT fooMessageXmlSchema)
AS
      DECLARE @handle uniqueidentifier;

      IF (convert(varchar, @body.query('count(/Root/*)')) != 0)
      BEGIN
            BEGIN DIALOG CONVERSATION     @handle
            FROM SERVICE      fooService
            TO SERVICE  'fooService'
            ON CONTRACT fooMessageContract
            WITH ENCRYPTION = OFF;        SEND
            ON CONVERSATION   @handle
            MESSAGE TYPE      fooMessage (@body);
            END CONVERSATION @handle;
      END
GO

STEP 3: Send Stored Procedure Caller

This piece of code creates a new message that can then be placed on the queue. Place this code inside some calling stored procedure and/or trigger.

DECLARE @body XML (DOCUMENT fooMessageXmlSchema);
      SELECT @body =  (
           SELECT (
                SELECT
                     i.foo1 as <span>"@foo1"</span>,
                     i.foo2 as <span>"@foo2"</span>
                FROM inserted i
                FOR XML PATH ('foo'), TYPE
           ) FOR XML PATH ('Root'), TYPE
     );    EXEC schemaName.fooSend @body;

STEP 4: Create the Receive Stored Procedure

The receive stored procedure is where the bulk of your work will be done. This is the sproc that will be called by the service broker queue every time a new message is inserted into the queue.

CREATE PROCEDURE [schemaName].[fooReceive]
AS
SET NOCOUNT ON;
DECLARE @dh UNIQUEIDENTIFIER;
DECLARE @mt SYSNAME;
DECLARE @MessageBody XML (DOCUMENT fooMessageXmlSchema);
DECLARE @foo1 int;
DECLARE @foo2 int;
DECLARE @fooCount int;
DECLARE @fooCurrent int;
BEGIN TRANSACTION;
WAITFOR (RECEIVE TOP (1) @dh = conversation_handle,
      @mt = message_type_name,
      @MessageBody = message_body
      FROM [schemaName].[fooQueue]), TIMEOUT 1000;
WHILE (@dh IS NOT NULL)
BEGIN
      BEGIN TRY
            IF @mt = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
            BEGIN
		END CONVERSATION @dh;
            END
            ELSE IF (@mt = N'fooMessage')
            BEGIN
                  SET @fooCount = @MessageBody.value('count(/Root/foo)', 'int');
                  SET @fooCurrent = 1;

                  WHILE (@fooCurrent &lt;= @fooCount)
                  BEGIN
                        --set variable values from xml message body
                        SELECT @foo1 = Root.foo.value('@foo1','int'),
                               @foo2 = Root.foo.value('@foo2','int')
                        FROM @MessageBody.nodes('(/Root/foo[sql:variable(&quot;@fooCurrent&quot;)])') AS Root(foo)

                        -- DO YOUR WORK HERE

                        -- increment the position

                        SET @fooCurrent = @fooCurrent + 1;
                  END
                  END CONVERSATION @dh;
            END
            COMMIT;
      END TRY
      BEGIN CATCH
                  -- If there are any exceptions then end conversation,
                  -- rollback the transaction and stop processing the queue
                  END CONVERSATION @dh;
                  ROLLBACK TRANSACTION;
                  INSERT INTO ServiceBrokerErrors (queuename, errornumber, errormessage, errordate)
                  VALUES ('fooQueue', ERROR_NUMBER(), ERROR_MESSAGE(), getdate())
                  ALTER QUEUE [schemaName].[fooQueue] WITH ACTIVATION ( STATUS = OFF );
                  BEGIN TRANSACTION
      END CATCH   -- Try to loop once more if there are more messages
      SELECT @dh = NULL;
      BEGIN TRANSACTION;
      WAITFOR (RECEIVE TOP (1) @dh = conversation_handle,
            @mt = message_type_name,
            @MessageBody = message_body
            FROM [schemaName].[fooQueue]), TIMEOUT 1000;
END
COMMIT;
GO

Note: The RECEIVE top (1) used in the above store procedure is good for a single reader. If you need to use multiple readers, you can use the following to receive all messages for a conversation group:

DECLARE  @Messages TABLE (MessageBody XML, MessageTypeName SYSNAME, ddh UNIQUEIDENTIFIER)
WAITFOR (RECEIVE  message_body, message_type_name, conversation_handle
	FROM [dbo].[EmailServiceColpaEventQueue]
	INTO @Messages), TIMEOUT 1000

You can then loop through the messages in the temporary table and end the conversation appropriately.

STEP 5: Setting up the Service Broker Queue

Create the Message Type for the queue. This states what the queue will accept as a valid message. In this Message Type we are saying it must be an XML document and must conform to the schema fooMessageXmlSchema.

CREATE MESSAGE TYPE [fooMessage] AUTHORIZATION [dbo] VALIDATION = VALID_XML
WITH SCHEMA COLLECTION [schemaName].[fooMessageXmlSchema]
GO

The Contract just states who is authorized to add messages to the queue and references the Message Type.

CREATE CONTRACT [fooMessageContract]
AUTHORIZATION [dbo] ([fooMessage] SENT BY INITIATOR)
GO

Create the new service broker queue.
Note: When creating the service broker queue, the Activation section allows you to attach the receive stored procedure (e.g., fooReceive) that you have already created. This stored procedure will be called any time a new message is inserted into the queue.

CREATE QUEUE [schemaName].[fooQueue]
WITH STATUS = ON , RETENTION = OFF ,
ACTIVATION 
(
      STATUS = ON ,
      PROCEDURE_NAME = [schemaName].[fooReceive] ,
      MAX_QUEUE_READERS = 1 ,
      EXECUTE AS OWNER
)
ON [PRIMARY]
GO

Finally, create a new Service to run the new queue.

CREATE SERVICE [fooService]
AUTHORIZATION [dbo]  ON QUEUE [schemaName].[fooQueue] ([fooMessageContract])
GO

Well that is all! I hope this helps!

Blog at WordPress.com.