Microsoft SQL Service Broker

What is Microsoft’s SQL service broker?
Its a message queuing platform on Microsoft’s SQL Server. It allows you to queue up jobs for the database to deal with asynchronously.

Why use it?
I’m no DBA, but in my case, I was working with a database server that was at full tilt and there was work that it could be handle asynchronously to spread the load. So there was data that needed processing but it did not need to be handled immediately, like data for some reports. This data could be placed on a queue to be processed by the database server in it’s own good time, evening out the demands on the server.

How does it work?
Well its pretty straightforward yet it looks a bit ridiculous written down. I think this because as a platform it can handle a lot of data messaging scenario’s and as such the syntax and examples usually given are quite generic.

Here’s the official guide, happy mining through all that!

You can find better ones here and here.

Now for my explanation…
( Warning: This is as I remember it working 2 years ago soooo I may be wrong! ).

There is no getting away from the terminology of the parts that make up the service broker. You have to have a gist of them to understand how it all works… again they make more sense (to me) with working with the service broker than any reading about it but anyways… here is the explanation from one of the guides above, but with my additions where I think its a little generic or jargony.

A Message is a piece of information exchanged between applications that use Service Broker. A message can optionally be validated to match a particular XML schema.
The data you are going to process and it can be formatted in XML for easier parsing.

A Conversation is a reliable, long-running, asynchronous exchange of messages.
Simple enough…

A Dialog is a conversation between two services. All Service Broker conversations are dialogs.
The exchange of your data from one table to another.

The Initiator is the participant that begins a dialog.
The stored procedure that initially places data in one queue to be sent to another to get processed.

The Target is the participant that accepts the dialog begun by the initiator.
The stored procedure that takes the data off the receiving queue to process it.

A Conversation Group is a group of related conversations. Every conversation belongs to exactly one conversation group.
Never had much dealings with this…

A Contract is an agreement between two services about the message types allowed in a conversation.
Pretty sure this just ties you to what message you are going to use…

A Queue stores the messages for a particular service.
Just a table for storing your message payload

A Service is a particular business task that can send and receive messages.
A stored procedure to manage and monitor the queues.

So let me give a brief explanation of where we were and what we wanted to do. We used stored procedures for all our data handling, we believed it to be best practice. We later found out its a nightmare to scale but that’s another post.. maybe.
Anyways, the front end of our application would call these stored procedures to process some data, either writing to or reading from the database. Sometimes, the time it took to process exceeded what the front end demanded. So we decided that we could place some of this data on a queue to be processed asynchronously… enter the service broker.

Again I think its important to say that I am no DBA and there may be a thousand other uses and implementations for the service broker, hence the generic terminology I guess.. but here is what I did…

Our service broker implementation:
First off you need to create the parts that make up the service broker. Just use the following code to get started. Here, you will create 2 queues, a queue for sending data and one for receiving messages. You will then create 2 services to manage these queues and lastly a message type that requires valid XML and a contract so the 2 services will now what message type to expect.

CREATE QUEUE receiving_queue
CREATE QUEUE sending_queue
CREATE Service sending_service ON QUEUE sending_queue
CREATE Service receiving_service ON QUEUE receiving_queue
CREATE MESSAGE TYPE my_message VALIDATION = WELL_FORMED_XML;
CREATE CONTRACT my_contract (my_message SENT BY ANY );

There are 2 stored procedures needed to handle placing the message on the sending queue (q_handler) and one to handle taking the message off the queue and process the data what ever way you need to (q_receiver).

I used a table SessionConversations to recycle the conversations used to exchange the messages from one queue to another. This is probably not always necessary but it is recommended to significantly improve performance.

-- This table associates the current connection (@@SPID)
-- with a conversation. The association key also
-- contains the conversation parameters:
-- from service, to service, contract used
--
CREATE TABLE [SessionConversations] (
	SPID INT NOT NULL,
	FromService SYSNAME NOT NULL,
	ToService SYSNAME NOT NULL,
	OnContract SYSNAME NOT NULL,
	Handle UNIQUEIDENTIFIER NOT NULL,
	PRIMARY KEY (SPID, FromService, ToService, OnContract),
	UNIQUE (Handle));
GO

A typical stored procedure to place data onto the sending queue usually uses this table to get a process to handle the message, but things get tricky when you want to do this.
Why?… well there are a few issues described in admirable detail here, but invalid XML/ poisoned message were my main problem.
The way this is usually handled is you try a number of times, if it fails then raise an error… well this was not enough for me as we could not risk the queue being down at any time, so the error, all errors, had to be handled in some manner.

I came up with a solution of using a blacklist table. If I trapped an error when trying to process the message (in the receiving stored procedure), I inserted this conversation onto this blacklist table. Then in the sending procedure, I check that the conversation handle is not on the blacklist, if it is I end it cleanly.
The downside of this solution, is the message that caused the error does not get handled automatically, however I have the message in the blacklist so I can manually deal with this in my own good time.
I also used an alert on this table to warn me (by email) when a message has been blacklisted. This worked really well for me, as poisoned messages were rare but always handled.

The stored procedures are pretty self explanatory…

Sending Procedure

-- SEND procedure. Will lookup to reuse an existing conversation
-- or start a new in case no conversation exists or the conversation
-- cannot be used
--
CREATE PROCEDURE [dbo].[q_handler] (
      @fromService SYSNAME,
      @toService SYSNAME,
      @onContract SYSNAME,
      @messageType SYSNAME,
      @messageBody XML)
AS
BEGIN
      SET NOCOUNT ON;
      DECLARE @handle UNIQUEIDENTIFIER;
      DECLARE @old_handle UNIQUEIDENTIFIER
      DECLARE @counter INT;
      DECLARE @error INT;
      DECLARE @state nvarchar(10);		
 
      SELECT @counter = 1;

      --BEGIN TRANSACTION XACT_q_Handler;
      -- Will need a loop to retry in case the conversation is
      -- in a state that does not allow transmission
      
      WHILE (1=1)
      BEGIN
            -- Seek an eligible conversation in [SessionConversations]

            SELECT @handle = Handle FROM [SessionConversations]  WHERE SPID = @@SPID AND FromService = @fromService AND ToService = @toService AND OnContract = @OnContract
			
			IF @handle IS NOT NULL
			BEGIN
				PRINT N'CHECK BLACKLIST';	
				IF (EXISTS(SELECT count(tid) FROM brokerError WHERE handle = @handle))
				BEGIN
					SET @old_handle = @handle
			
					PRINT N'IN BLACKLIST SO END CONVERSATION';
					END CONVERSATION @handle

					PRINT N'BEGIN NEW DIALOG CONVERSATION';	
					-- Need to start a new conversation for the current @@spid
					--
					BEGIN DIALOG CONVERSATION @handle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF;
					-- Set an one hour timer on the conversation
					BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 3600;

					UPDATE [SessionConversations]
					SET Handle = @handle
					WHERE handle = @old_handle AND SPID = @@SPID AND FromService = @fromService AND ToService = @toService AND OnContract = @onContract 

					PRINT N'End Dialog BL';
				END
				ELSE
				BEGIN
					PRINT N'CHECK HANDLE STATE';	
					SET @state = (Select [state] FROM sys.conversation_endpoints WHERE conversation_handle = @handle) 
					IF @state <> 'CO'
					BEGIN
						SET @old_handle = @handle

						PRINT N'HANDLE STATE IN ERROR!';
						end conversation @handle
						PRINT N'BEGIN NEW DIALOG CONVERSATION';	
						-- Need to start a new conversation for the current @@spid
						--
						BEGIN DIALOG CONVERSATION @handle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF;
						-- Set an one hour timer on the conversation
						BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 3600;

						UPDATE [SessionConversations]
						SET Handle = @handle
						WHERE handle = @old_handle AND SPID = @@SPID AND FromService = @fromService AND ToService = @toService AND OnContract = @onContract 

						PRINT N'End Dialog CS';
					END				
				END
			END 
			ELSE
            BEGIN
				PRINT N'BEGIN DIALOG CONVERSATION';	
				-- Need to start a new conversation for the current @@spid
				--
				BEGIN DIALOG CONVERSATION @handle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF;
				-- Set an one hour timer on the conversation
				BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 3600;

				INSERT INTO [SessionConversations]
					(SPID, FromService, ToService, OnContract, Handle)
					VALUES
					(@@SPID, @fromService, @toService, @onContract, @handle);

				PRINT N'End Dialog';
            END;
            -- Attempt to SEND on the associated conversation
            --
			PRINT N'SEND ON CONVERSATION';
            SEND ON CONVERSATION @handle
                  MESSAGE TYPE @messageType
                  (@messageBody)

            SELECT @error = @@ERROR

            IF @error = 0
            BEGIN
			      PRINT N'Successful send';
                  -- Successful send, just exit the loop
                  --
                  BREAK;
            END
			
			PRINT N'Error Found';
            SELECT @counter = @counter+1;
            IF @counter > 10
            BEGIN
                  -- We failed 10 times in a  row, something must be broken
                  --
			      PRINT N'Failed to SEND on a conversation for more than 10 times. Error %i.';	
                  RAISERROR (N'Failed to SEND on a conversation for more than 10 times. Error %i.', 16, 1, @error) WITH LOG;
            BREAK;
            END
 
            -- Delete the associated conversation from the table and try again
 
			PRINT N'DELETE HANDLE';	
            DELETE FROM [SessionConversations]
            WHERE Handle = @handle;
            SELECT @handle = NULL;
      END  
END

Receiving Procedure

CREATE PROCEDURE [dbo].[q_receiver]

AS
BEGIN

DECLARE @pID int
DECLARE @error int, @message nvarchar(max)

WHILE (1=1)
BEGIN TRY
		DECLARE @h UNIQUEIDENTIFIER
		DECLARE @msgBody XML
		DECLARE @messageTypeName sysname

		BEGIN TRANSACTION

		WAITFOR(RECEIVE TOP(1) @h = conversation_handle, @messageTypeName = message_type_name, @msgBody = message_body FROM pdReceivingQueue),TIMEOUT 60000;

		IF(@@rowcount = 0)
			BEGIN
			ROLLBACK TRANSACTION 
			BREAK;
		END

		IF N'pdMessage' = @messageTypeName
		BEGIN
			PRINT N'pdMessage';

			SET @pID = @msgBody.value('(/pdMessage/pID)[1]', 'int');

			IF (@pID=1)
			BEGIN
				DECLARE @rID int
				DECLARE @fID int
				DECLARE @responseXML XML
				SET @rID = @msgBody.value('(/pdMessage/rID)[1]', 'int');
				SET @fID = @msgBody.value('(/pdMessage/fID)[1]','int');
				SET @responseXML = @msgBody.query('(/pdMessage/ResponseXML/answers)[1]');
                
                                PRINT N'Call Stored Procedure to process data';
				EXEC rLoad @rID, @fID, @responseXML

			END
			ELSE IF (@pID=2)
			BEGIN
				DECLARE @poll_id int
				DECLARE @user_ip bigint
				DECLARE @poll_type bit
				DECLARE @answer nvarchar(500)
				DECLARE @other_id int
				SET @poll_id = @msgBody.value('(/pdMessage/poll_id)[1]', 'int');
				SET @other_id = @msgBody.value('(/pdMessage/other_id)[1]', 'int');
				SET @user_ip = @msgBody.value('(/pdMessage/user_ip)[1]', 'bigint');
				SET @poll_type = @msgBody.value('(/pdMessage/poll_type)[1]','bit');
				SET @answer = @msgBody.value('(/pdMessage/answer)[1]','nvarchar(500)');

                                PRINT N'Call Stored Procedure to process data';
				EXEC VA_register  @poll_id, @user_ip, @poll_type, @answer, @other_id

			END
		END
		ELSE IF N'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer' = @messageTypeName
			BEGIN
			PRINT N'EndDialog';
			END CONVERSATION @h;
		END
		ELSE IF N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' = @messageTypeName
			BEGIN
			PRINT N'EndDialog';
			END CONVERSATION @h;
		END
		ELSE IF N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' = @messageTypeName
		BEGIN  
			PRINT N'Error';            
			-- Log the received error into ERRORLOG and system Event Log (eventvwr.exe)
			DECLARE @h_string NVARCHAR(100);
			SELECT @h_string = CAST(@h AS NVARCHAR(100)), @message = CAST(@msgBody AS NVARCHAR(max));
			RAISERROR (N'Conversation %s was ended with error %s', 10, 1, @h_string, @message);-- WITH LOG;
			END CONVERSATION @h;
		END
		ELSE
		BEGIN
			PRINT N'Unable to process message.';   
			END CONVERSATION @h WITH ERROR = 127 DESCRIPTION = N'Unable to process message.' ;
		END

	COMMIT TRANSACTION;
END TRY
BEGIN CATCH

	-- Test XACT_STATE for 0, 1, or -1.
	-- If 1, the transaction is committable.
	-- If -1, the transaction is uncommittable and should be rolled back.

	-- Test whether the transaction is uncommittable.
	IF (XACT_STATE()) = -1
	BEGIN
		ROLLBACK TRANSACTION --UndoReceive
		-- take corrective actions
		
		BEGIN TRANSACTION;
			RECEIVE TOP(1) @msgBody = message_body 
			FROM pdReceivingQueue
			where conversation_handle = @h

			IF(@@rowcount = 0)
			BEGIN
				ROLLBACK TRANSACTION 			
			END
			ELSE
			BEGIN
				SELECT @error = ERROR_NUMBER(), @message = ERROR_MESSAGE();

				IF @error = 1205 --DEADLOCK
				BEGIN
					ROLLBACK TRANSACTION 
				END
				ELSE
				BEGIN
					INSERT INTO [dbo].[brokerError] ([errorID],[errorContent],[posionedXML],[handle]) values (ISNULL(@error,'0'),ISNULL(@message,'No error message found!'),@msgBody,@h)	
					COMMIT TRANSACTION
				END
			end
	end

	-- Test wether the transaction is active and valid.
	IF (XACT_STATE()) = 1
	BEGIN
		SELECT @error = ERROR_NUMBER(), @message = ERROR_MESSAGE();
		
		IF @error = 1205 --DEADLOCK
		BEGIN
			ROLLBACK TRANSACTION 
		END
		ELSE
		BEGIN
			end conversation @h with error = @error description = @message
			INSERT INTO [dbo].[brokerError] ([errorID],[errorContent],[posionedXML],[handle]) values (ISNULL(@error,'0'),ISNULL(@message,'No error message found!'),@msgBody,@h)
			COMMIT TRANSACTION
		END
	END

END CATCH;
END

You need to now set up activation on the queue, so this basically tells the service that when you detect a message on the , what stored procedure you call to process it. There is room here for optimization with the number of queue readers, more here.

ALTER QUEUE sending_queue WITH STATUS=ON, ACTIVATION (
      STATUS = ON,
      MAX_QUEUE_READERS = 2,
      PROCEDURE_NAME = q_receiver,
      EXECUTE AS OWNER
);

You should have a functional broker messaging system now but what happens if its not working. Well this is my only crib about the service broker platform, I found it difficult to pin down why the broker was not working. One thing I vividly remember and it had a touch of voodoo about it, was if I made a change to the stored procedure that is called by the receiving stored procedure, I had to restart the activation on the queue again… never found out why!!

Here are a bunch of selects to help debug your broker.

SELECT * FROM sys.dm_broker_forwarded_messages
SELECT * FROM sys.dm_broker_connections
SELECT * FROM sys.dm_broker_activated_tasks
SELECT * FROM sys.dm_broker_queue_monitors
SELECT * FROM sys.service_broker_endpoints
SELECT * FROM sys.service_contract_message_usages
SELECT * FROM sys.service_contract_usages
SELECT * FROM sys.service_message_types
SELECT * FROM sys.service_queue_usages
SELECT * FROM sys.service_queues
SELECT * FROM sys.services

But I mostly found what I wanted in the error logs in SQL server.

HANDY TIP: Enable Service Broker on you SQL Server!
Now I know that sounds stupid but hear me out. As a platform, the service broker was great for me but it is a pain in the arse to debug. I worked with this over a year and a half ago, I still remember the pain it took to find the most simple of things… like the broker is not working because you never enabled it in the first place!.
Here’s a guide to do it… but one thing it fails to mention as do so many of them, you will get an error if you just try to alter the database without first killing all processes active on the database… yes that’s right, you will need to disable SQL server agent and take the database temporarily offline!.. yuck!

To conclude, I found this platform reliable, pretty straightforward to set up and I would recommend it if you have a situation of limited resources and data that can be handled asynchronously.

Feel free to correct my failing memory!.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s