What is Microsoft’s SQL service broker?
Its a message queuing platform on Microsoft’s SQL Server. It allows you to queue up jobs for the database to deal with asynchronously.
Why use it?
I’m no DBA, but in my case, I was working with a database server that was at full tilt and there was work that it could be handle asynchronously to spread the load. So there was data that needed processing but it did not need to be handled immediately, like data for some reports. This data could be placed on a queue to be processed by the database server in it’s own good time, evening out the demands on the server.
How does it work?
Well its pretty straightforward yet it looks a bit ridiculous written down. I think this because as a platform it can handle a lot of data messaging scenario’s and as such the syntax and examples usually given are quite generic.
Here’s the official guide, happy mining through all that!
You can find better ones here and here.
Now for my explanation…
( Warning: This is as I remember it working 2 years ago soooo I may be wrong! ).
There is no getting away from the terminology of the parts that make up the service broker. You have to have a gist of them to understand how it all works… again they make more sense (to me) with working with the service broker than any reading about it but anyways… here is the explanation from one of the guides above, but with my additions where I think its a little generic or jargony.
A Message is a piece of information exchanged between applications that use Service Broker. A message can optionally be validated to match a particular XML schema.
The data you are going to process and it can be formatted in XML for easier parsing.
A Conversation is a reliable, long-running, asynchronous exchange of messages.
Simple enough…
A Dialog is a conversation between two services. All Service Broker conversations are dialogs.
The exchange of your data from one table to another.
The Initiator is the participant that begins a dialog.
The stored procedure that initially places data in one queue to be sent to another to get processed.
The Target is the participant that accepts the dialog begun by the initiator.
The stored procedure that takes the data off the receiving queue to process it.
A Conversation Group is a group of related conversations. Every conversation belongs to exactly one conversation group.
Never had much dealings with this…
A Contract is an agreement between two services about the message types allowed in a conversation.
Pretty sure this just ties you to what message you are going to use…
A Queue stores the messages for a particular service.
Just a table for storing your message payload
A Service is a particular business task that can send and receive messages.
A stored procedure to manage and monitor the queues.
So let me give a brief explanation of where we were and what we wanted to do. We used stored procedures for all our data handling, we believed it to be best practice. We later found out its a nightmare to scale but that’s another post.. maybe.
Anyways, the front end of our application would call these stored procedures to process some data, either writing to or reading from the database. Sometimes, the time it took to process exceeded what the front end demanded. So we decided that we could place some of this data on a queue to be processed asynchronously… enter the service broker.
Again I think its important to say that I am no DBA and there may be a thousand other uses and implementations for the service broker, hence the generic terminology I guess.. but here is what I did…
Our service broker implementation:
First off you need to create the parts that make up the service broker. Just use the following code to get started. Here, you will create 2 queues, a queue for sending data and one for receiving messages. You will then create 2 services to manage these queues and lastly a message type that requires valid XML and a contract so the 2 services will now what message type to expect.
CREATE QUEUE receiving_queue CREATE QUEUE sending_queue CREATE Service sending_service ON QUEUE sending_queue CREATE Service receiving_service ON QUEUE receiving_queue CREATE MESSAGE TYPE my_message VALIDATION = WELL_FORMED_XML; CREATE CONTRACT my_contract (my_message SENT BY ANY );
There are 2 stored procedures needed to handle placing the message on the sending queue (q_handler) and one to handle taking the message off the queue and process the data what ever way you need to (q_receiver).
I used a table SessionConversations to recycle the conversations used to exchange the messages from one queue to another. This is probably not always necessary but it is recommended to significantly improve performance.
-- This table associates the current connection (@@SPID) -- with a conversation. The association key also -- contains the conversation parameters: -- from service, to service, contract used -- CREATE TABLE [SessionConversations] ( SPID INT NOT NULL, FromService SYSNAME NOT NULL, ToService SYSNAME NOT NULL, OnContract SYSNAME NOT NULL, Handle UNIQUEIDENTIFIER NOT NULL, PRIMARY KEY (SPID, FromService, ToService, OnContract), UNIQUE (Handle)); GO
A typical stored procedure to place data onto the sending queue usually uses this table to get a process to handle the message, but things get tricky when you want to do this.
Why?… well there are a few issues described in admirable detail here, but invalid XML/ poisoned message were my main problem.
The way this is usually handled is you try a number of times, if it fails then raise an error… well this was not enough for me as we could not risk the queue being down at any time, so the error, all errors, had to be handled in some manner.
I came up with a solution of using a blacklist table. If I trapped an error when trying to process the message (in the receiving stored procedure), I inserted this conversation onto this blacklist table. Then in the sending procedure, I check that the conversation handle is not on the blacklist, if it is I end it cleanly.
The downside of this solution, is the message that caused the error does not get handled automatically, however I have the message in the blacklist so I can manually deal with this in my own good time.
I also used an alert on this table to warn me (by email) when a message has been blacklisted. This worked really well for me, as poisoned messages were rare but always handled.
The stored procedures are pretty self explanatory…
Sending Procedure
-- SEND procedure. Will lookup to reuse an existing conversation -- or start a new in case no conversation exists or the conversation -- cannot be used -- CREATE PROCEDURE [dbo].[q_handler] ( @fromService SYSNAME, @toService SYSNAME, @onContract SYSNAME, @messageType SYSNAME, @messageBody XML) AS BEGIN SET NOCOUNT ON; DECLARE @handle UNIQUEIDENTIFIER; DECLARE @old_handle UNIQUEIDENTIFIER DECLARE @counter INT; DECLARE @error INT; DECLARE @state nvarchar(10); SELECT @counter = 1; --BEGIN TRANSACTION XACT_q_Handler; -- Will need a loop to retry in case the conversation is -- in a state that does not allow transmission WHILE (1=1) BEGIN -- Seek an eligible conversation in [SessionConversations] SELECT @handle = Handle FROM [SessionConversations] WHERE SPID = @@SPID AND FromService = @fromService AND ToService = @toService AND OnContract = @OnContract IF @handle IS NOT NULL BEGIN PRINT N'CHECK BLACKLIST'; IF (EXISTS(SELECT count(tid) FROM brokerError WHERE handle = @handle)) BEGIN SET @old_handle = @handle PRINT N'IN BLACKLIST SO END CONVERSATION'; END CONVERSATION @handle PRINT N'BEGIN NEW DIALOG CONVERSATION'; -- Need to start a new conversation for the current @@spid -- BEGIN DIALOG CONVERSATION @handle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF; -- Set an one hour timer on the conversation BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 3600; UPDATE [SessionConversations] SET Handle = @handle WHERE handle = @old_handle AND SPID = @@SPID AND FromService = @fromService AND ToService = @toService AND OnContract = @onContract PRINT N'End Dialog BL'; END ELSE BEGIN PRINT N'CHECK HANDLE STATE'; SET @state = (Select [state] FROM sys.conversation_endpoints WHERE conversation_handle = @handle) IF @state <> 'CO' BEGIN SET @old_handle = @handle PRINT N'HANDLE STATE IN ERROR!'; end conversation @handle PRINT N'BEGIN NEW DIALOG CONVERSATION'; -- Need to start a new conversation for the current @@spid -- BEGIN DIALOG CONVERSATION @handle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF; -- Set an one hour timer on the conversation BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 3600; UPDATE [SessionConversations] SET Handle = @handle WHERE handle = @old_handle AND SPID = @@SPID AND FromService = @fromService AND ToService = @toService AND OnContract = @onContract PRINT N'End Dialog CS'; END END END ELSE BEGIN PRINT N'BEGIN DIALOG CONVERSATION'; -- Need to start a new conversation for the current @@spid -- BEGIN DIALOG CONVERSATION @handle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF; -- Set an one hour timer on the conversation BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 3600; INSERT INTO [SessionConversations] (SPID, FromService, ToService, OnContract, Handle) VALUES (@@SPID, @fromService, @toService, @onContract, @handle); PRINT N'End Dialog'; END; -- Attempt to SEND on the associated conversation -- PRINT N'SEND ON CONVERSATION'; SEND ON CONVERSATION @handle MESSAGE TYPE @messageType (@messageBody) SELECT @error = @@ERROR IF @error = 0 BEGIN PRINT N'Successful send'; -- Successful send, just exit the loop -- BREAK; END PRINT N'Error Found'; SELECT @counter = @counter+1; IF @counter > 10 BEGIN -- We failed 10 times in a row, something must be broken -- PRINT N'Failed to SEND on a conversation for more than 10 times. Error %i.'; RAISERROR (N'Failed to SEND on a conversation for more than 10 times. Error %i.', 16, 1, @error) WITH LOG; BREAK; END -- Delete the associated conversation from the table and try again PRINT N'DELETE HANDLE'; DELETE FROM [SessionConversations] WHERE Handle = @handle; SELECT @handle = NULL; END END
Receiving Procedure
CREATE PROCEDURE [dbo].[q_receiver] AS BEGIN DECLARE @pID int DECLARE @error int, @message nvarchar(max) WHILE (1=1) BEGIN TRY DECLARE @h UNIQUEIDENTIFIER DECLARE @msgBody XML DECLARE @messageTypeName sysname BEGIN TRANSACTION WAITFOR(RECEIVE TOP(1) @h = conversation_handle, @messageTypeName = message_type_name, @msgBody = message_body FROM pdReceivingQueue),TIMEOUT 60000; IF(@@rowcount = 0) BEGIN ROLLBACK TRANSACTION BREAK; END IF N'pdMessage' = @messageTypeName BEGIN PRINT N'pdMessage'; SET @pID = @msgBody.value('(/pdMessage/pID)[1]', 'int'); IF (@pID=1) BEGIN DECLARE @rID int DECLARE @fID int DECLARE @responseXML XML SET @rID = @msgBody.value('(/pdMessage/rID)[1]', 'int'); SET @fID = @msgBody.value('(/pdMessage/fID)[1]','int'); SET @responseXML = @msgBody.query('(/pdMessage/ResponseXML/answers)[1]'); PRINT N'Call Stored Procedure to process data'; EXEC rLoad @rID, @fID, @responseXML END ELSE IF (@pID=2) BEGIN DECLARE @poll_id int DECLARE @user_ip bigint DECLARE @poll_type bit DECLARE @answer nvarchar(500) DECLARE @other_id int SET @poll_id = @msgBody.value('(/pdMessage/poll_id)[1]', 'int'); SET @other_id = @msgBody.value('(/pdMessage/other_id)[1]', 'int'); SET @user_ip = @msgBody.value('(/pdMessage/user_ip)[1]', 'bigint'); SET @poll_type = @msgBody.value('(/pdMessage/poll_type)[1]','bit'); SET @answer = @msgBody.value('(/pdMessage/answer)[1]','nvarchar(500)'); PRINT N'Call Stored Procedure to process data'; EXEC VA_register @poll_id, @user_ip, @poll_type, @answer, @other_id END END ELSE IF N'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer' = @messageTypeName BEGIN PRINT N'EndDialog'; END CONVERSATION @h; END ELSE IF N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' = @messageTypeName BEGIN PRINT N'EndDialog'; END CONVERSATION @h; END ELSE IF N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' = @messageTypeName BEGIN PRINT N'Error'; -- Log the received error into ERRORLOG and system Event Log (eventvwr.exe) DECLARE @h_string NVARCHAR(100); SELECT @h_string = CAST(@h AS NVARCHAR(100)), @message = CAST(@msgBody AS NVARCHAR(max)); RAISERROR (N'Conversation %s was ended with error %s', 10, 1, @h_string, @message);-- WITH LOG; END CONVERSATION @h; END ELSE BEGIN PRINT N'Unable to process message.'; END CONVERSATION @h WITH ERROR = 127 DESCRIPTION = N'Unable to process message.' ; END COMMIT TRANSACTION; END TRY BEGIN CATCH -- Test XACT_STATE for 0, 1, or -1. -- If 1, the transaction is committable. -- If -1, the transaction is uncommittable and should be rolled back. -- Test whether the transaction is uncommittable. IF (XACT_STATE()) = -1 BEGIN ROLLBACK TRANSACTION --UndoReceive -- take corrective actions BEGIN TRANSACTION; RECEIVE TOP(1) @msgBody = message_body FROM pdReceivingQueue where conversation_handle = @h IF(@@rowcount = 0) BEGIN ROLLBACK TRANSACTION END ELSE BEGIN SELECT @error = ERROR_NUMBER(), @message = ERROR_MESSAGE(); IF @error = 1205 --DEADLOCK BEGIN ROLLBACK TRANSACTION END ELSE BEGIN INSERT INTO [dbo].[brokerError] ([errorID],[errorContent],[posionedXML],[handle]) values (ISNULL(@error,'0'),ISNULL(@message,'No error message found!'),@msgBody,@h) COMMIT TRANSACTION END end end -- Test wether the transaction is active and valid. IF (XACT_STATE()) = 1 BEGIN SELECT @error = ERROR_NUMBER(), @message = ERROR_MESSAGE(); IF @error = 1205 --DEADLOCK BEGIN ROLLBACK TRANSACTION END ELSE BEGIN end conversation @h with error = @error description = @message INSERT INTO [dbo].[brokerError] ([errorID],[errorContent],[posionedXML],[handle]) values (ISNULL(@error,'0'),ISNULL(@message,'No error message found!'),@msgBody,@h) COMMIT TRANSACTION END END END CATCH; END
You need to now set up activation on the queue, so this basically tells the service that when you detect a message on the , what stored procedure you call to process it. There is room here for optimization with the number of queue readers, more here.
ALTER QUEUE sending_queue WITH STATUS=ON, ACTIVATION ( STATUS = ON, MAX_QUEUE_READERS = 2, PROCEDURE_NAME = q_receiver, EXECUTE AS OWNER );
You should have a functional broker messaging system now but what happens if its not working. Well this is my only crib about the service broker platform, I found it difficult to pin down why the broker was not working. One thing I vividly remember and it had a touch of voodoo about it, was if I made a change to the stored procedure that is called by the receiving stored procedure, I had to restart the activation on the queue again… never found out why!!
Here are a bunch of selects to help debug your broker.
SELECT * FROM sys.dm_broker_forwarded_messages SELECT * FROM sys.dm_broker_connections SELECT * FROM sys.dm_broker_activated_tasks SELECT * FROM sys.dm_broker_queue_monitors SELECT * FROM sys.service_broker_endpoints SELECT * FROM sys.service_contract_message_usages SELECT * FROM sys.service_contract_usages SELECT * FROM sys.service_message_types SELECT * FROM sys.service_queue_usages SELECT * FROM sys.service_queues SELECT * FROM sys.services
But I mostly found what I wanted in the error logs in SQL server.
HANDY TIP: Enable Service Broker on you SQL Server!
Now I know that sounds stupid but hear me out. As a platform, the service broker was great for me but it is a pain in the arse to debug. I worked with this over a year and a half ago, I still remember the pain it took to find the most simple of things… like the broker is not working because you never enabled it in the first place!.
Here’s a guide to do it… but one thing it fails to mention as do so many of them, you will get an error if you just try to alter the database without first killing all processes active on the database… yes that’s right, you will need to disable SQL server agent and take the database temporarily offline!.. yuck!
To conclude, I found this platform reliable, pretty straightforward to set up and I would recommend it if you have a situation of limited resources and data that can be handled asynchronously.
Feel free to correct my failing memory!.