Service Broker 可以创建异步的、数据驱动的消息应用程序,它允许一个数据库发送消息到其他数据库,而不需要等待响应,即使远程数据库不能立即处理这些消息,发送数据库也可以继续其他操作。通过使用T-SQL对象和命令,就可以完成管理Service Broker。
Service Broker 为SQL Server提供消息队列,这样就可以从数据库中发送异步事务性消息到队列,在队列中这些消息将会被其他服务获取和处理,该服务可能运行在其他数据库、或服务器上。另外,对于异步程序,发送一条消息,并且应用程序不需要等待原始消息已被接收、或处理的确认信息,就可以处理其他相关的任务。一旦完成特定任务,两个Service Broker 服务之间的会话就可以显式的结束。
Service Broker包含了一些开箱即用的特性,它处理当试图创建自己的异步消息系统时,可能经常遇到的复杂问题。
比如:
首先,Service Broker 消息要保证以适当的顺序,或者以他们发送的原始顺序进行接收,而且这些消息也只能被接收一次(调度程序保证不会重复的读取),并且可以作为同一个会话的一部分发送,与任务的同一个实例相关;
其次,Service Broker 保证了消息的发送,当尝试发送第一个消息时,目标数据库(消息的接收者)不可用,消息将加入到发送方的数据库的队列中,当目标数据库变为可用时,发送者将会尝试发送这个消息;
再次,由于Service Broker 是内建在SQL Server数据库中的,可以与数据库的其余部分仪器备份,所以这些消息在数据库发生故障的情况下也是可以恢复的;
最后使用Service Broker 的事件通知功能,可以跟踪数据库和SQL Server实例的事件,这个与SQL Trace相似,但事件通知是异步的,并且对SQL Server实例整体的性能的影响最小,而SQL Trace对性能的影响较大。
use master go if not exists(select 1 from sys.databases where name = 'BOOKSTORE') CREATE DATABASE bookstore go if not exists(select 1 from sys.databases where name = 'bookDistribution') CREATE DATABASE bookDistribution go --1.启动Service broker alter database bookstore set enable_broker --disable_broker可以禁用 alter database bookstore set trustworthy on --指明SQL Server实例是否信任该数据库以及其中的内容 alter database bookDistribution set enable_broker alter database bookDistribution set trustworthy on --2.创建数据库主密钥 use bookstore go create master key encryption by password = '123abc' use bookDistribution go create master key encryption by password = 'abc123' --3.创建消息类型,定义了从Service Broker端点发送的消息中包含的数据类型 use bookstore go create message type [//SackConsulting/SendBookOrder] validation = WELL_FORMED_XML go create message type [//SackConsulting/BookOrderReceived] validation = WELL_FORMED_XML use bookDistribution go create message type [//SackConsulting/SendBookOrder] validation = well_formed_xml go create message type [//SackConsulting/BookOrderReceived] validation = well_formed_xml go --4.创建约定,定义了在任务级别可以发送或者接收的消息类型 use bookstore go create contract [//SackConsulting/BookOrderContract] ( [//SackConsulting/SendBookOrder] sent by initiator, --可以由会话的发起方发送的消息类型 [//SackConsulting/BookOrderReceived] sent by target --可以由会话的目标方发送的消息类型 ) use bookDistribution go create contract [//SackConsulting/BookOrderContract] ( [//SackConsulting/SendBookOrder] sent by initiator, [//SackConsulting/BookOrderReceived] sent by target ) go --5.创建队列,队列用来保存数据 --通过select语句来查询队列,或者用receive命令从队列检索一条、多条消息 --检索程序可以是外部的.net程序,不过通过存储过程来实现更方便 use bookstore go create queue bookStoreQueue with status = on go --在创建queue时可以把自动处理消息的程序,绑定到队列的激活选项, --此处通过手动控制队列中的信息交换 use bookDistribution go create queue bookStoreDistributionQueue with status = on go --6.创建服务这样就可以把消息队列绑定到一个或者多个约定上 --服务使用队列和约定来定义一个或一组任务 --服务是消息的发起方和接收方强制约定的规则,并将消息路由到正确的队列 use bookstore go create service [//SackConsulting/BookOrderService] on queue dbo.bookStoreQueue ([//SackConsulting/BookOrderContract]) use bookDistribution go create service [//SackConsulting/BookStoreDistributionService] on queue dbo.bookStoreDistributionQueue ([//SackConsulting/BookOrderContract]) --7.1开始会话,发送消息 use bookstore go declare @conversation_handler uniqueidentifier; declare @order_msg xml; begin dialog conversation @conversation_handler from service [//SackConsulting/BookOrderService] to service '//SackConsulting/BookStoreDistributionService' on contract [//SackConsulting/BookOrderContract]; set @order_msg = '<order id="1234" customer="22" orderdate="2012-10-01"> <LineItem ItemNumber="1" ISBN="1-12345-123-0" Quantity="1" /> </order> '; --send语句可以发送消息 send on conversation @conversation_handler message type [//SackConsulting/SendBookOrder] (@order_msg); --7.2.1检索消息 use bookDistribution go select message_type_name, --消息类型名 CAST(message_body as xml) as message, --消息 queuing_order, --队列顺序,从0开始 conversation_handle, --会话句柄 conversation_group_id --会话组id from dbo.bookstoreDistributionQueue create table dbo.bookOrderReceived ( bookOrderReceivedID int identity(1,1) not null, conversation_handle uniqueidentifier not null, conversation_group_id uniqueidentifier not null, message_body xml not null ) --7.2.2receive语句会从队列中读取消息,并且把已经读取的消息删除 declare @conversation_handler uniqueidentifier declare @conversation_group uniqueidentifier declare @order_msg xml declare @Text_response_msg varchar(max) declare @response_msg xml declare @orderID int; receive top(1) @order_msg = message_body, @conversation_handler = conversation_handle, @conversation_group = conversation_group_id from dbo.bookStoreDistributionQueue; insert into dbo.bookOrderReceived (conversation_handle,conversation_group_id,message_body) values(@conversation_handler, @conversation_group, @order_msg) select @orderID = @order_msg.value('(/order/@id)[1]','int') select @Text_response_msg = '<orderreceived id="' + CAST(@orderID as varchar(10)) + '"/>' select @response_msg = CAST(@Text_response_msg as xml); --7.2.3发送回复消息 send on conversation @conversation_handler message type [//SackConsulting/BookOrderReceived] (@response_msg); --7.3查看返回的消息,结束会话 use bookstore go create table dbo.bookOrderConfirmation ( bookorderconfirmationID int identity(1,1) not null, conversation_handle uniqueidentifier not null, datereceived datetime not null default getdate(), message_body xml not null ) declare @conversation_handler uniqueidentifier declare @conversation_group uniqueidentifier declare @order_msg xml declare @text_response_msg varcahr(max); receive top(1) @conversation_handler = conversation_handle, @order_msg = message_body from dbo.bookstorequeue insert into dbo.bookOrderConfirmation (conversation_handle,message_body) values(@conversation_handler,@order_msg) end conversation @conversation_handler; --结束会话 --7.4取出消息,判断是否是结束会话消息类型,如果是,那么结束会话 use bookDistribution go declare @conversation_handler uniqueidentifier declare @conversation_group uniqueidentifier declare @order_msg xml declare @message_type_name nvarchar(256); receive top(1) @conversation_handler = conversation_handle, @order_msg = message_body, @message_type_name = message_type_name from dbo.bookstoredistributionqueue /*====================================================== 结束会话会自动发送: http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog 的消息类型到目标数据库,双方,包括发起方和目标,必须都结束会话 ========================================================*/ if @message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' begin end conversation @conversation_handler; end --7.5查看会话端点的状态 select * from sys.conversation_endpoints --8.1设定Serivce Broker会话的优先级 ALTER DATABASE BOOKSTORE set honor_broker_priority on alter database bookdistribution set honor_broker_priority on --8.2查看数据库的属性 select name, is_honor_broker_priority_on from sys.databases where name in ('bookstore','bookdistribution') --8.3创建Service Broker优先级 use bookstore go create broker priority conversation_priority_bookordercontract_bookorderservice for conversation set ( contract_name = [//SackConsulting/BookOrderContract], local_service_name = [//SackConsulting/BookOrderService], remote_service_name = any, priority_level = 10 ) select cp.name, cp.priority, --优先级 cp.service_contract_id, sc.name, --约定名称 cp.local_service_id, s.name, --服务名称 cp.remote_service_name from sys.conversation_priorities cp inner join sys.service_contracts sc on cp.service_contract_id = sc.service_contract_id inner join sys.services s on s.service_id = cp.local_service_id use bookDistribution go create broker priority conversation_priority_bookordercontract_bookstoredistributionservice for conversation set ( contract_name = [//SackConsulting/BookOrderContract], local_service_name = [//SackConsulting/BookStoreDistributionService], remote_service_name = any, priority_level = 10 ) select cp.name, cp.priority, --优先级 cp.service_contract_id, sc.name, --约定名称 cp.local_service_id, s.name, --服务名称 cp.remote_service_name from sys.conversation_priorities cp inner join sys.service_contracts sc on cp.service_contract_id = sc.service_contract_id inner join sys.services s on s.service_id = cp.local_service_id --8.4修改优先级 use bookstore go alter broker priority conversation_priority_bookordercontract_bookorderservice for conversation set ( remote_service_name = '//SackConsulting/BookStoreDistributionService' ) use bookDistribution go alter broker priority conversation_priority_bookordercontract_bookstoredistributionservice for conversation set ( priority_level = 9 ) --8.5删除优先级 drop broker priority conversation_priority_bookordercontract_bookstoredistributionservice
创建处理消息的存储过程
前面使用了临时的T-SQL来处理从队列传入的消息,也可以通过存储过程或外部应用程序创建服务程序,来自动的激活并处理队列中的消息的服务程序,同时还可以指定同时执行的服务程序的数量。
use bookDistribution go create procedure dbo.usp_service_broker_ReceiveOrders as declare @conversation_handler uniqueidentifier declare @conversation_group uniqueidentifier declare @order_msg xml declare @text_response_msg varchar(8000) declare @response_msg xml declare @message_type_name nvarchar(156) declare @orderID int --当发生运行时错误时,会自动回滚事务 set xact_abort on begin tran; --接收消息 receive top(1) @order_msg = message_body, @conversation_handler = conversation_handle, @conversation_group = conversation_group_id, @message_type_name = message_type_name from dbo.bookStoreDistributionQueue --消息类型 if @message_type_name = '//SackConsulting/SendBookOrder' begin insert into dbo.bookOrderReceived (conversation_handle,conversation_group_id,message_body) values(@conversation_handler, @conversation_group, @order_msg) select @orderID = @order_msg.value('(/order/@id)[1]','int') select @text_response_msg = '<orderreceived id="' + CAST(@orderID as varchar(10)) + '"/>'; select @response_msg = CAST(@text_response_msg as XML); send on conversation @conversation_handler message type [//SackConsulting/BookOrderReceived] (@response_msg); end --如果收到结束会话的消息 if @message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' begin end conversation @conversation_handler; end commit tran go --修改队列设置 alter queue dbo.BookstoreDistributionQueue with activation ( status = on, procedure_name = dbo.usp_service_broker_receiveOrders, max_queue_readers = 2, --存储过程执行的最大数量 execute as self ) /*========================================== 通过drop:删除与队列关联的所有激活信息 alter queue dbo.BookstoreDistributionQueue with activation ( drop ) ============================================*/ use bookstore go declare @conversation_handler uniqueidentifier declare @order_msg xml begin dialog conversation @conversation_handler from service [//SackConsulting/BookOrderService] to service '//SackConsulting/BookStoreDistributionService' on contract [//SackConsulting/BookOrderContract]; set @order_msg = '<order id="1234" customer="22" orderdate="2012-10-01"> <LineItem ItemNumber="1" ISBN="1-12345-123-0" Quantity="1" /> </order> '; --send语句可以发送消息 send on conversation @conversation_handler message type [//SackConsulting/SendBookOrder] (@order_msg); --可以接收到一个消息,这个消息是对方在接受到下消息后,激活存储过程后,由存储过程回复的消息 select conversation_handle, CAST(message_body as xml) from dbo.bookStoreQueue
实现远程服务器的Service Broker
前面的例子是同一个SQL Server实例的两个数据库,但大部分情况会将Service Broker设置为,使用在两个或多个SQL Server实例上的数据库。为了实现跨越服务器通信,可以通过Windows身份验证或基于证书的身份验证,启用传输安全模式、启用对话安全模式、创建路由、创建远程绑定。
--在实例1上 use master go if exists(select 1 from sys.databases where name = 'bookstore') drop database bookstore else create database bookstore go alter database bookstore set enable_broker alter database bookstore set trustworthy on use bookstore go create message type [//BookConsulting/SendBookOrder] validation = well_formed_xml go create message type [//BookConsulting/BookOrderReceived] validation = well_formed_xml go create contract [//BookConsulting/BookOrderContract] ( [//BookConsulting/SendBookOrder] sent by initiator, [//BookConsulting/BookOrderReceived] sent by target ) go create queue BookStoreQueue with status = on create service [//BookConsulting/BookOrderService] on queue dbo.BookStoreQueue ( [//BookConsulting/BookOrderContract] ) --在实例2上 use master go if exists(select 1 from sys.databases where name = 'bookdistribution') drop database bookdistribution else create database bookdistribution go alter database bookdistribution set enable_broker alter database bookdistribution set trustworthy on use bookdistribution go create message type [//BookConsulting/SendBookOrder] validation = well_formed_xml go create message type [//BookConsulting/BookOrderReceived] validation = well_formed_xml go create contract [//BookConsulting/BookOrderContract] ( [//BookConsulting/SendBookOrder] sent by initiator, [//BookConsulting/BookOrderReceived] sent by target ) go create queue BookDistributionQueue with status = on create service [//BookConsulting/BookDistributionService] on queue dbo.BookDistributionQueue ( [//BookConsulting/BookOrderContract] ) --启用传输安全模式,只是限制其他实例是否能访问本地服务器的端点 --实例1 use master go --1.删除已经存在的数据库主密钥 drop master key --2.创建数据库主密钥 create master key encryption by password = '123456!@#' --3.创建证书 create certificate bookMasterCert with subject = 'book Transport Security Service Broker', expiry_date = '2012-12-31' --4.备份证书 backup certificate bookMasterCert to file = 'c:\bookMasterCert.cer' go --5.创建端点 create endpoint service_broker_book_endpoint state = started as tcp (listener_port = 4020) for service_broker ( authentication = certificate bookMasterCert, encryption = required ) --6.创建SQL Server的登录名 create login service_broker_login with password = 'service_broker_login123' --7.创建数据库用户名 create user service_broker_user for login service_broker_login --8.授予数据库用户可以连接端点 grant connect on endpoint::service_broker_book_endpoint to service_broker_login --9.通过另一个实例复制到本地服务器上的证书文件,来创建证书 create certificate bookDistributionMasterCert authorization service_broker_user from file = 'c:\bookDistributionMasterCert.cer' go --实例2 use master go --删除数据库主密钥 drop master key create master key encryption by password = '123456&^%' create certificate bookDistributionMasterCert with subject = 'bookDistribution Transport Security Service Broker', expiry_date = '2012-12-31' backup certificate bookDistributionMasterCert to file = 'c:\bookDistributionMasterCert.cer' create endpoint service_broker_bookdistribution_endpoint state = started as tcp (listener_port = 4021) for service_broker ( authentication = certificate bookDistributionMasterCert, encryption = required ) create login service_broker_login with password = 'service_broker_login123' create user service_broker_user for login service_broker_login grant connect on endpoint::service_broker_bookdistribution_endpoint to service_broker_login create certificate bookMasterCert authorization service_broker_user from file = 'c:\bookMasterCert.cer' go --启用对话安全模式 --实例1 use bookstore go --1.创建数据库主密钥 create master key encryption by password = '123456!@#' --2.创建证书,这里可以给当前数据库用户创建多个证书,不会有影响 --当接收到其他服务器传送过来的消息时,可以用这个证书来解密消息 create certificate BookStoreCert with subject = 'BookStore service broker cert', expiry_date = '2012-12-31' --3.备份证书 backup certificate bookstorecert to file = 'c:\bookstorecert.cer' go --4.创建数据库用户,此用户只可以有一个证书 create user bookDistributionUser without login go --5.通过从另一个实例复制过来的证书,来创建证书,并指定所有者为此用户 create certificate bookDistributionCert authorization bookDistributionUser --此用户只能拥有一个证书, --在发送消息时会用这个证书来加密消息 from file = 'c:\bookDistributionCert.cer' --6.授予此用户名在某个服务上发送的权限 grant send on service::[//BookConsulting/BookOrderService] to bookDistributionUser go --7.创建路由 create route route_bookDistribution with service_name = '//BookConsulting/BookDistributionService', address = 'tcp://192.168.1.16:4021' --8.创建远程绑定 create remote service binding bookDistributionBinding to service '//BookConsulting/BookDistributionService' with user = bookDistributionUser --9.开始会话,发送消息 declare @conversation_handler uniqueidentifier declare @order_msg xml; begin dialog conversation @conversation_handler from service [//BookConsulting/BookOrderService] to service '//BookConsulting/BookDistributionService' on contract [//BookConsulting/BookOrderContract] set @order_msg = '<order id="1234" customer="22" orderdate="2012-10-01"> <LineItem ItemNumber="1" ISBN="1-12345-123-0" Quantity="1" /> </order> '; --send语句可以发送消息 send on conversation @conversation_handler message type [//BookConsulting/SendBookOrder] (@order_msg); --启用对话安全模式 --实例2 use bookdistribution go create master key encryption by password = '123456&^%' --当接收到对方发送的消息后,用此证书来解密 create certificate BookDistributionCert with subject = 'BookDistribution service broker cert', expiry_date = '2012-12-31' backup certificate bookDistributioncert to file = 'c:\bookDistributioncert.cer' create user bookStoreUser without login --在发送之前,用此证书来加密消息 create certificate bookStoreCert authorization bookStoreUser from file = 'c:\bookStoreCert.cer' grant send on service::[//BookConsulting/BookDistributionService] to bookStoreUser create route route_bookStore with service_name = '//BookConsulting/BookOrderService', address = 'tcp://192.168.9.67:4020' create remote service binding bookStoreBinding to service '//BookConsulting/BookOrderService' with user = bookStoreUser --查询消息 SELECT * FROM dbo.bookdistributionqueue
事件通知
事件通知是集成到Service Broker的功能,这样可以在SQL Server实例中异步捕获SQL事件,将事件信息路由到特定的队列中。只需要最小的系统开销,就可以跟踪发生在SQL Server实例的事件,比如用户登录,存储过程重新编译,权限修改,对象处理(包括:对数据库、程序集、角色、表的create/alter/drop事件)。
使用事件通知只需要创建队列和Service Broker组件,在SQL Server中已经内建了,用来捕捉和发送事件通知的消息类型、约定。
IF NOT exists(select 1 from sys.databases where name = 'EventTracking') create database EventTracking else drop database EventTracking go use eventtracking go --1.创建队列 create queue SQLEventQueue with status = on go --2.在队列上创建服务,关联到内建的事件通知约定 create service [//EventTracking/TrackLoginAlterService] on queue SQLEventQueue ([http://schemas.microsoft.com/SQL/Notifications/PostEventNotification]); --3.查询guid select service_broker_guid --6AE234DF-C4BB-4C1B-9E08-F4EA66359B6A from sys.databases where name = 'EventTracking' --4.创建server范围的事件通知,来跟踪SQL Server实例中所有登录名的创建,修改,删除 create event notification EVENT_LoginEvent on server for create_login,alter_login,drop_login to service '//EventTracking/TrackLoginAlterService', '6AE234DF-C4BB-4C1B-9E08-F4EA66359B6A' --service_broker_guid --5.创建一个登录名,事件通知,把消息放入队列 create login login_ggg with password = '123456!@' --6.查询消息 select CAST(message_body as xml) as event_Info from dbo.sqleventqueue