What is Microsoft’s SQL service broker?
Its a message queuing platform on Microsoft’s SQL Server. It allows you to queue up jobs for the database to deal with asynchronously.
Why use it?
I’m no DBA, but in my case, I was working with a database server that was at full tilt and there was work that it could be handle asynchronously to spread the load. So there was data that needed processing but it did not need to be handled immediately, like data for some reports. This data could be placed on a queue to be processed by the database server in it’s own good time, evening out the demands on the server.
How does it work?
Well its pretty straightforward yet it looks a bit ridiculous written down. I think this because as a platform it can handle a lot of data messaging scenario’s and as such the syntax and examples usually given are quite generic.
Here’s the official guide, happy mining through all that!
You can find better ones here and here.
Now for my explanation…
( Warning: This is as I remember it working 2 years ago soooo I may be wrong! ).
There is no getting away from the terminology of the parts that make up the service broker. You have to have a gist of them to understand how it all works… again they make more sense (to me) with working with the service broker than any reading about it but anyways… here is the explanation from one of the guides above, but with my additions where I think its a little generic or jargony.
A Message is a piece of information exchanged between applications that use Service Broker. A message can optionally be validated to match a particular XML schema.
The data you are going to process and it can be formatted in XML for easier parsing.
A Conversation is a reliable, long-running, asynchronous exchange of messages.
Simple enough…
A Dialog is a conversation between two services. All Service Broker conversations are dialogs.
The exchange of your data from one table to another.
The Initiator is the participant that begins a dialog.
The stored procedure that initially places data in one queue to be sent to another to get processed.
The Target is the participant that accepts the dialog begun by the initiator.
The stored procedure that takes the data off the receiving queue to process it.
A Conversation Group is a group of related conversations. Every conversation belongs to exactly one conversation group.
Never had much dealings with this…
A Contract is an agreement between two services about the message types allowed in a conversation.
Pretty sure this just ties you to what message you are going to use…
A Queue stores the messages for a particular service.
Just a table for storing your message payload
A Service is a particular business task that can send and receive messages.
A stored procedure to manage and monitor the queues.
So let me give a brief explanation of where we were and what we wanted to do. We used stored procedures for all our data handling, we believed it to be best practice. We later found out its a nightmare to scale but that’s another post.. maybe.
Anyways, the front end of our application would call these stored procedures to process some data, either writing to or reading from the database. Sometimes, the time it took to process exceeded what the front end demanded. So we decided that we could place some of this data on a queue to be processed asynchronously… enter the service broker.
Again I think its important to say that I am no DBA and there may be a thousand other uses and implementations for the service broker, hence the generic terminology I guess.. but here is what I did…
Our service broker implementation:
First off you need to create the parts that make up the service broker. Just use the following code to get started. Here, you will create 2 queues, a queue for sending data and one for receiving messages. You will then create 2 services to manage these queues and lastly a message type that requires valid XML and a contract so the 2 services will now what message type to expect.
[source language=“sql“]
CREATE QUEUE receiving_queue
CREATE QUEUE sending_queue
CREATE Service sending_service ON QUEUE sending_queue
CREATE Service receiving_service ON QUEUE receiving_queue
CREATE MESSAGE TYPE my_message VALIDATION = WELL_FORMED_XML;
CREATE CONTRACT my_contract (my_message SENT BY ANY );
[/source]
There are 2 stored procedures needed to handle placing the message on the sending queue (q_handler) and one to handle taking the message off the queue and process the data what ever way you need to (q_receiver).
I used a table SessionConversations to recycle the conversations used to exchange the messages from one queue to another. This is probably not always necessary but it is recommended to significantly improve performance.
[source language=“sql“]
— This table associates the current connection (@@SPID)
— with a conversation. The association key also
— contains the conversation parameters:
— from service, to service, contract used
—
CREATE TABLE [SessionConversations] (
SPID INT NOT NULL,
FromService SYSNAME NOT NULL,
ToService SYSNAME NOT NULL,
OnContract SYSNAME NOT NULL,
Handle UNIQUEIDENTIFIER NOT NULL,
PRIMARY KEY (SPID, FromService, ToService, OnContract),
UNIQUE (Handle));
GO
[/source]
A typical stored procedure to place data onto the sending queue usually uses this table to get a process to handle the message, but things get tricky when you want to do this.
Why?… well there are a few issues described in admirable detail here, but invalid XML/ poisoned message were my main problem.
The way this is usually handled is you try a number of times, if it fails then raise an error… well this was not enough for me as we could not risk the queue being down at any time, so the error, all errors, had to be handled in some manner.
I came up with a solution of using a blacklist table. If I trapped an error when trying to process the message (in the receiving stored procedure), I inserted this conversation onto this blacklist table. Then in the sending procedure, I check that the conversation handle is not on the blacklist, if it is I end it cleanly.
The downside of this solution, is the message that caused the error does not get handled automatically, however I have the message in the blacklist so I can manually deal with this in my own good time.
I also used an alert on this table to warn me (by email) when a message has been blacklisted. This worked really well for me, as poisoned messages were rare but always handled.
The stored procedures are pretty self explanatory…
Sending Procedure
[source language=“sql“]
— SEND procedure. Will lookup to reuse an existing conversation
— or start a new in case no conversation exists or the conversation
— cannot be used
—
CREATE PROCEDURE [dbo].[q_handler] (
@fromService SYSNAME,
@toService SYSNAME,
@onContract SYSNAME,
@messageType SYSNAME,
@messageBody XML)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @handle UNIQUEIDENTIFIER;
DECLARE @old_handle UNIQUEIDENTIFIER
DECLARE @counter INT;
DECLARE @error INT;
DECLARE @state nvarchar(10);
SELECT @counter = 1;
–BEGIN TRANSACTION XACT_q_Handler;
— Will need a loop to retry in case the conversation is
— in a state that does not allow transmission
WHILE (1=1)
BEGIN
— Seek an eligible conversation in [SessionConversations]
SELECT @handle = Handle FROM [SessionConversations] WHERE SPID = @@SPID AND FromService = @fromService AND ToService = @toService AND OnContract = @OnContract
IF @handle IS NOT NULL
BEGIN
PRINT N’CHECK BLACKLIST‘;
IF (EXISTS(SELECT count(tid) FROM brokerError WHERE handle = @handle))
BEGIN
SET @old_handle = @handle
PRINT N’IN BLACKLIST SO END CONVERSATION‘;
END CONVERSATION @handle
PRINT N’BEGIN NEW DIALOG CONVERSATION‘;
— Need to start a new conversation for the current @@spid
—
BEGIN DIALOG CONVERSATION @handle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF;
— Set an one hour timer on the conversation
BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 3600;
UPDATE [SessionConversations]
SET Handle = @handle
WHERE handle = @old_handle AND SPID = @@SPID AND FromService = @fromService AND ToService = @toService AND OnContract = @onContract
PRINT N’End Dialog BL‘;
END
ELSE
BEGIN
PRINT N’CHECK HANDLE STATE‘;
SET @state = (Select [state] FROM sys.conversation_endpoints WHERE conversation_handle = @handle)
IF @state <> ‚CO‘
BEGIN
SET @old_handle = @handle
PRINT N’HANDLE STATE IN ERROR!‘;
end conversation @handle
PRINT N’BEGIN NEW DIALOG CONVERSATION‘;
— Need to start a new conversation for the current @@spid
—
BEGIN DIALOG CONVERSATION @handle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF;
— Set an one hour timer on the conversation
BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 3600;
UPDATE [SessionConversations]
SET Handle = @handle
WHERE handle = @old_handle AND SPID = @@SPID AND FromService = @fromService AND ToService = @toService AND OnContract = @onContract
PRINT N’End Dialog CS‘;
END
END
END
ELSE
BEGIN
PRINT N’BEGIN DIALOG CONVERSATION‘;
— Need to start a new conversation for the current @@spid
—
BEGIN DIALOG CONVERSATION @handle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF;
— Set an one hour timer on the conversation
BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 3600;
INSERT INTO [SessionConversations]
(SPID, FromService, ToService, OnContract, Handle)
VALUES
(@@SPID, @fromService, @toService, @onContract, @handle);
PRINT N’End Dialog‘;
END;
— Attempt to SEND on the associated conversation
—
PRINT N’SEND ON CONVERSATION‘;
SEND ON CONVERSATION @handle
MESSAGE TYPE @messageType
(@messageBody)
SELECT @error = @@ERROR
IF @error = 0
BEGIN
PRINT N’Successful send‘;
— Successful send, just exit the loop
—
BREAK;
END
PRINT N’Error Found‘;
SELECT @counter = @counter+1;
IF @counter > 10
BEGIN
— We failed 10 times in a row, something must be broken
—
PRINT N’Failed to SEND on a conversation for more than 10 times. Error %i.‘;
RAISERROR (N’Failed to SEND on a conversation for more than 10 times. Error %i.‘, 16, 1, @error) WITH LOG;
BREAK;
END
— Delete the associated conversation from the table and try again
PRINT N’DELETE HANDLE‘;
DELETE FROM [SessionConversations]
WHERE Handle = @handle;
SELECT @handle = NULL;
END
END
[/source]
Receiving Procedure
[source language=“sql“]
CREATE PROCEDURE [dbo].[q_receiver]
AS
BEGIN
DECLARE @pID int
DECLARE @error int, @message nvarchar(max)
WHILE (1=1)
BEGIN TRY
DECLARE @h UNIQUEIDENTIFIER
DECLARE @msgBody XML
DECLARE @messageTypeName sysname
BEGIN TRANSACTION
WAITFOR(RECEIVE TOP(1) @h = conversation_handle, @messageTypeName = message_type_name, @msgBody = message_body FROM pdReceivingQueue),TIMEOUT 60000;
IF(@@rowcount = 0)
BEGIN
ROLLBACK TRANSACTION
BREAK;
END
IF N’pdMessage‘ = @messageTypeName
BEGIN
PRINT N’pdMessage‘;
SET @pID = @msgBody.value(‚(/pdMessage/pID)[1]‘, ‚int‘);
IF (@pID=1)
BEGIN
DECLARE @rID int
DECLARE @fID int
DECLARE @responseXML XML
SET @rID = @msgBody.value(‚(/pdMessage/rID)[1]‘, ‚int‘);
SET @fID = @msgBody.value(‚(/pdMessage/fID)[1]‘,’int‘);
SET @responseXML = @msgBody.query(‚(/pdMessage/ResponseXML/answers)[1]‘);
PRINT N’Call Stored Procedure to process data‘;
EXEC rLoad @rID, @fID, @responseXML
END
ELSE IF (@pID=2)
BEGIN
DECLARE @poll_id int
DECLARE @user_ip bigint
DECLARE @poll_type bit
DECLARE @answer nvarchar(500)
DECLARE @other_id int
SET @poll_id = @msgBody.value(‚(/pdMessage/poll_id)[1]‘, ‚int‘);
SET @other_id = @msgBody.value(‚(/pdMessage/other_id)[1]‘, ‚int‘);
SET @user_ip = @msgBody.value(‚(/pdMessage/user_ip)[1]‘, ‚bigint‘);
SET @poll_type = @msgBody.value(‚(/pdMessage/poll_type)[1]‘,’bit‘);
SET @answer = @msgBody.value(‚(/pdMessage/answer)[1]‘,’nvarchar(500)‘);
PRINT N’Call Stored Procedure to process data‘;
EXEC VA_register @poll_id, @user_ip, @poll_type, @answer, @other_id
END
END
ELSE IF N’http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer‘ = @messageTypeName
BEGIN
PRINT N’EndDialog‘;
END CONVERSATION @h;
END
ELSE IF N’http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog‘ = @messageTypeName
BEGIN
PRINT N’EndDialog‘;
END CONVERSATION @h;
END
ELSE IF N’http://schemas.microsoft.com/SQL/ServiceBroker/Error‘ = @messageTypeName
BEGIN
PRINT N’Error‘;
— Log the received error into ERRORLOG and system Event Log (eventvwr.exe)
DECLARE @h_string NVARCHAR(100);
SELECT @h_string = CAST(@h AS NVARCHAR(100)), @message = CAST(@msgBody AS NVARCHAR(max));
RAISERROR (N’Conversation %s was ended with error %s‘, 10, 1, @h_string, @message);– WITH LOG;
END CONVERSATION @h;
END
ELSE
BEGIN
PRINT N’Unable to process message.‘;
END CONVERSATION @h WITH ERROR = 127 DESCRIPTION = N’Unable to process message.‘ ;
END
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
— Test XACT_STATE for 0, 1, or -1.
— If 1, the transaction is committable.
— If -1, the transaction is uncommittable and should be rolled back.
— Test whether the transaction is uncommittable.
IF (XACT_STATE()) = -1
BEGIN
ROLLBACK TRANSACTION –UndoReceive
— take corrective actions
BEGIN TRANSACTION;
RECEIVE TOP(1) @msgBody = message_body
FROM pdReceivingQueue
where conversation_handle = @h
IF(@@rowcount = 0)
BEGIN
ROLLBACK TRANSACTION
END
ELSE
BEGIN
SELECT @error = ERROR_NUMBER(), @message = ERROR_MESSAGE();
IF @error = 1205 –DEADLOCK
BEGIN
ROLLBACK TRANSACTION
END
ELSE
BEGIN
INSERT INTO [dbo].[brokerError] ([errorID],[errorContent],[posionedXML],[handle]) values (ISNULL(@error,’0′),ISNULL(@message,’No error message found!‘),@msgBody,@h)
COMMIT TRANSACTION
END
end
end
— Test wether the transaction is active and valid.
IF (XACT_STATE()) = 1
BEGIN
SELECT @error = ERROR_NUMBER(), @message = ERROR_MESSAGE();
IF @error = 1205 –DEADLOCK
BEGIN
ROLLBACK TRANSACTION
END
ELSE
BEGIN
end conversation @h with error = @error description = @message
INSERT INTO [dbo].[brokerError] ([errorID],[errorContent],[posionedXML],[handle]) values (ISNULL(@error,’0′),ISNULL(@message,’No error message found!‘),@msgBody,@h)
COMMIT TRANSACTION
END
END
END CATCH;
END
[/source]
You need to now set up activation on the queue, so this basically tells the service that when you detect a message on the , what stored procedure you call to process it. There is room here for optimization with the number of queue readers, more here.
[source language=“sql“]
ALTER QUEUE sending_queue WITH STATUS=ON, ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = 2,
PROCEDURE_NAME = q_receiver,
EXECUTE AS OWNER
);
[/source]
You should have a functional broker messaging system now but what happens if its not working. Well this is my only crib about the service broker platform, I found it difficult to pin down why the broker was not working. One thing I vividly remember and it had a touch of voodoo about it, was if I made a change to the stored procedure that is called by the receiving stored procedure, I had to restart the activation on the queue again… never found out why!!
Here are a bunch of selects to help debug your broker.
[source language=“sql“]
SELECT * FROM sys.dm_broker_forwarded_messages
SELECT * FROM sys.dm_broker_connections
SELECT * FROM sys.dm_broker_activated_tasks
SELECT * FROM sys.dm_broker_queue_monitors
SELECT * FROM sys.service_broker_endpoints
SELECT * FROM sys.service_contract_message_usages
SELECT * FROM sys.service_contract_usages
SELECT * FROM sys.service_message_types
SELECT * FROM sys.service_queue_usages
SELECT * FROM sys.service_queues
SELECT * FROM sys.services
[/source]
But I mostly found what I wanted in the error logs in SQL server.
HANDY TIP: Enable Service Broker on you SQL Server!
Now I know that sounds stupid but hear me out. As a platform, the service broker was great for me but it is a pain in the arse to debug. I worked with this over a year and a half ago, I still remember the pain it took to find the most simple of things… like the broker is not working because you never enabled it in the first place!.
Here’s a guide to do it… but one thing it fails to mention as do so many of them, you will get an error if you just try to alter the database without first killing all processes active on the database… yes that’s right, you will need to disable SQL server agent and take the database temporarily offline!.. yuck!
To conclude, I found this platform reliable, pretty straightforward to set up and I would recommend it if you have a situation of limited resources and data that can be handled asynchronously.
Feel free to correct my failing memory!.
