【SQL Server学习笔记】Service Broker创建异步的、数据驱动的消息应用程序

Service Broker 可以创建异步的、数据驱动的消息应用程序,它允许一个数据库发送消息到其他数据库,而不需要等待响应,即使远程数据库不能立即处理这些消息,发送数据库也可以继续其他操作。通过使用T-SQL对象和命令,就可以完成管理Service Broker。

Service Broker 为SQL Server提供消息队列,这样就可以从数据库中发送异步事务性消息到队列,在队列中这些消息将会被其他服务获取和处理,该服务可能运行在其他数据库、或服务器上。另外,对于异步程序,发送一条消息,并且应用程序不需要等待原始消息已被接收、或处理的确认信息,就可以处理其他相关的任务。一旦完成特定任务,两个Service Broker 服务之间的会话就可以显式的结束。

Service Broker包含了一些开箱即用的特性,它处理当试图创建自己的异步消息系统时,可能经常遇到的复杂问题。

比如:

首先,Service Broker 消息要保证以适当的顺序,或者以他们发送的原始顺序进行接收,而且这些消息也只能被接收一次(调度程序保证不会重复的读取),并且可以作为同一个会话的一部分发送,与任务的同一个实例相关;

其次,Service Broker 保证了消息的发送,当尝试发送第一个消息时,目标数据库(消息的接收者)不可用,消息将加入到发送方的数据库的队列中,当目标数据库变为可用时,发送者将会尝试发送这个消息;

再次,由于Service Broker 是内建在SQL Server数据库中的,可以与数据库的其余部分仪器备份,所以这些消息在数据库发生故障的情况下也是可以恢复的;

最后使用Service Broker 的事件通知功能,可以跟踪数据库和SQL Server实例的事件,这个与SQL Trace相似,但事件通知是异步的,并且对SQL Server实例整体的性能的影响最小,而SQL Trace对性能的影响较大。

use master
go

if not exists(select 1 from sys.databases where name = 'BOOKSTORE')
	CREATE DATABASE bookstore
go


if not exists(select 1 from sys.databases where name = 'bookDistribution')
	CREATE DATABASE bookDistribution
go


--1.启动Service broker
alter database bookstore set enable_broker   --disable_broker可以禁用

alter database bookstore set trustworthy on  --指明SQL Server实例是否信任该数据库以及其中的内容

alter database bookDistribution set enable_broker

alter database bookDistribution set trustworthy on


--2.创建数据库主密钥
use bookstore
go

create master key
encryption by password = '123abc'

use bookDistribution
go

create master key
encryption by password = 'abc123'



--3.创建消息类型,定义了从Service Broker端点发送的消息中包含的数据类型
use bookstore
go

create message type [//SackConsulting/SendBookOrder]
validation = WELL_FORMED_XML
go

create message type [//SackConsulting/BookOrderReceived]
validation = WELL_FORMED_XML



use bookDistribution
go


create message type [//SackConsulting/SendBookOrder]
validation = well_formed_xml
go

create message type [//SackConsulting/BookOrderReceived]
validation = well_formed_xml
go


--4.创建约定,定义了在任务级别可以发送或者接收的消息类型
use bookstore
go

create contract [//SackConsulting/BookOrderContract]
(
	[//SackConsulting/SendBookOrder] sent by initiator,  --可以由会话的发起方发送的消息类型
	[//SackConsulting/BookOrderReceived] sent by target  --可以由会话的目标方发送的消息类型
)


use bookDistribution
go

create contract [//SackConsulting/BookOrderContract]
(
	[//SackConsulting/SendBookOrder] sent by initiator,
	[//SackConsulting/BookOrderReceived] sent by target 
)
go


--5.创建队列,队列用来保存数据
--通过select语句来查询队列,或者用receive命令从队列检索一条、多条消息
--检索程序可以是外部的.net程序,不过通过存储过程来实现更方便
use bookstore
go

create queue bookStoreQueue
with status = on 
go


--在创建queue时可以把自动处理消息的程序,绑定到队列的激活选项,
--此处通过手动控制队列中的信息交换
use bookDistribution
go

create queue bookStoreDistributionQueue
with status = on 
go



--6.创建服务这样就可以把消息队列绑定到一个或者多个约定上
--服务使用队列和约定来定义一个或一组任务
--服务是消息的发起方和接收方强制约定的规则,并将消息路由到正确的队列
use bookstore
go

create service [//SackConsulting/BookOrderService]
on queue dbo.bookStoreQueue ([//SackConsulting/BookOrderContract])


use bookDistribution
go

create service [//SackConsulting/BookStoreDistributionService]
on queue dbo.bookStoreDistributionQueue ([//SackConsulting/BookOrderContract])




--7.1开始会话,发送消息
use bookstore
go

declare @conversation_handler uniqueidentifier;
declare @order_msg xml;


begin dialog conversation @conversation_handler
from service [//SackConsulting/BookOrderService]
to service '//SackConsulting/BookStoreDistributionService'
on contract [//SackConsulting/BookOrderContract];


set @order_msg = 
	'<order id="1234" customer="22" orderdate="2012-10-01">
	<LineItem ItemNumber="1" ISBN="1-12345-123-0" Quantity="1" />
	</order>
	';

--send语句可以发送消息	
send on conversation @conversation_handler
message type [//SackConsulting/SendBookOrder]
(@order_msg);


--7.2.1检索消息
use bookDistribution
go

select message_type_name,      --消息类型名
       CAST(message_body as xml) as message,  --消息
       queuing_order,                         --队列顺序,从0开始
       
       conversation_handle,                   --会话句柄
       conversation_group_id                  --会话组id
from dbo.bookstoreDistributionQueue



create table dbo.bookOrderReceived
(
	bookOrderReceivedID int identity(1,1) not null,
	conversation_handle uniqueidentifier not null,
	conversation_group_id uniqueidentifier not null,
	message_body xml not null
)


--7.2.2receive语句会从队列中读取消息,并且把已经读取的消息删除
declare @conversation_handler uniqueidentifier
declare @conversation_group uniqueidentifier

declare @order_msg xml

declare @Text_response_msg varchar(max)
declare @response_msg xml

declare @orderID int;


receive top(1)
	@order_msg = message_body,
	@conversation_handler = conversation_handle,
	@conversation_group = conversation_group_id
from dbo.bookStoreDistributionQueue;


insert into dbo.bookOrderReceived
(conversation_handle,conversation_group_id,message_body)
values(@conversation_handler,
       @conversation_group,
       @order_msg)


select @orderID = @order_msg.value('(/order/@id)[1]','int')

select @Text_response_msg = 
	'<orderreceived id="' + CAST(@orderID as varchar(10)) + '"/>'

select @response_msg = CAST(@Text_response_msg as xml);

--7.2.3发送回复消息
send on conversation @conversation_handler
message type [//SackConsulting/BookOrderReceived]
(@response_msg);



--7.3查看返回的消息,结束会话
use bookstore
go


create table dbo.bookOrderConfirmation
(
	bookorderconfirmationID int identity(1,1) not null,
	conversation_handle uniqueidentifier not null,
	datereceived datetime not null default getdate(),
	message_body xml not null
)


declare @conversation_handler uniqueidentifier
declare @conversation_group uniqueidentifier

declare @order_msg xml
declare @text_response_msg varcahr(max);


receive top(1)
	@conversation_handler = conversation_handle,
	@order_msg = message_body
from dbo.bookstorequeue


insert into dbo.bookOrderConfirmation
(conversation_handle,message_body)
values(@conversation_handler,@order_msg)


end conversation @conversation_handler;  --结束会话



--7.4取出消息,判断是否是结束会话消息类型,如果是,那么结束会话
use bookDistribution
go


declare @conversation_handler uniqueidentifier
declare @conversation_group uniqueidentifier

declare @order_msg xml

declare @message_type_name nvarchar(256);


receive top(1)
	@conversation_handler =  conversation_handle,
	@order_msg = message_body,
	@message_type_name = message_type_name
from dbo.bookstoredistributionqueue

/*======================================================
结束会话会自动发送:
http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog
的消息类型到目标数据库,双方,包括发起方和目标,必须都结束会话

========================================================*/
if @message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
begin
	end conversation @conversation_handler;
end


--7.5查看会话端点的状态
select *
from sys.conversation_endpoints




--8.1设定Serivce Broker会话的优先级
ALTER DATABASE BOOKSTORE
set honor_broker_priority on

alter database bookdistribution
set honor_broker_priority on


--8.2查看数据库的属性
select name,
       is_honor_broker_priority_on
from sys.databases
where name in ('bookstore','bookdistribution')


--8.3创建Service Broker优先级
use bookstore
go

create broker priority conversation_priority_bookordercontract_bookorderservice
for conversation
set (
		contract_name = [//SackConsulting/BookOrderContract],
		local_service_name = [//SackConsulting/BookOrderService],
		remote_service_name = any,
		priority_level = 10
	)

select cp.name,
       cp.priority,             --优先级
       
       cp.service_contract_id,
       sc.name,                 --约定名称
       
       cp.local_service_id,
       s.name,                  --服务名称       cp.remote_service_name
       
from sys.conversation_priorities cp

inner join sys.service_contracts sc
        on cp.service_contract_id = sc.service_contract_id
        
inner join sys.services s
        on s.service_id = cp.local_service_id


use bookDistribution
go

create broker priority conversation_priority_bookordercontract_bookstoredistributionservice
for conversation
set
	(
		contract_name = [//SackConsulting/BookOrderContract],
		local_service_name = [//SackConsulting/BookStoreDistributionService],
		remote_service_name = any,
		priority_level = 10
	)

select cp.name,
       cp.priority,             --优先级
       
       cp.service_contract_id,
       sc.name,                 --约定名称
       
       cp.local_service_id,
       s.name,                  --服务名称
       cp.remote_service_name       
from sys.conversation_priorities cp
inner join sys.service_contracts sc
        on cp.service_contract_id = sc.service_contract_id
        
inner join sys.services s
        on s.service_id = cp.local_service_id
	


--8.4修改优先级
use bookstore
go

alter broker priority conversation_priority_bookordercontract_bookorderservice
for conversation
set
	(
		remote_service_name = '//SackConsulting/BookStoreDistributionService'
	)


use bookDistribution
go

alter broker priority conversation_priority_bookordercontract_bookstoredistributionservice
for conversation
set
	(
		priority_level = 9
	)
	

--8.5删除优先级
drop broker priority conversation_priority_bookordercontract_bookstoredistributionservice


 创建处理消息的存储过程

前面使用了临时的T-SQL来处理从队列传入的消息,也可以通过存储过程或外部应用程序创建服务程序,来自动的激活并处理队列中的消息的服务程序,同时还可以指定同时执行的服务程序的数量。

use bookDistribution
go

create procedure dbo.usp_service_broker_ReceiveOrders
as

declare @conversation_handler uniqueidentifier
declare @conversation_group uniqueidentifier
declare @order_msg xml
declare @text_response_msg varchar(8000)

declare @response_msg xml
declare @message_type_name nvarchar(156)
declare @orderID int

--当发生运行时错误时,会自动回滚事务
set xact_abort on


begin tran;
	
	--接收消息
	receive top(1) 
		@order_msg = message_body,
	    @conversation_handler = conversation_handle,
	    @conversation_group = conversation_group_id,
	    @message_type_name = message_type_name
	from dbo.bookStoreDistributionQueue
	
	--消息类型
	if @message_type_name = '//SackConsulting/SendBookOrder'
	begin
		insert into dbo.bookOrderReceived
		(conversation_handle,conversation_group_id,message_body)
		values(@conversation_handler,
		       @conversation_group,
		       @order_msg)
		       
		select @orderID = @order_msg.value('(/order/@id)[1]','int')
		
		select @text_response_msg = '<orderreceived id="' + 
		                             CAST(@orderID as varchar(10)) +
		                             '"/>';
		
		select @response_msg = CAST(@text_response_msg as XML);
		
		send on conversation @conversation_handler
		message type [//SackConsulting/BookOrderReceived]
		(@response_msg);
	end
	
	--如果收到结束会话的消息
	if @message_type_name = 
	       'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    begin	
		end conversation @conversation_handler;
	end
	
commit tran

go
					

--修改队列设置
alter queue dbo.BookstoreDistributionQueue
with activation 
(
	status = on,
	procedure_name = dbo.usp_service_broker_receiveOrders,
	max_queue_readers = 2,   --存储过程执行的最大数量
	execute as self
)	


/*==========================================
通过drop:删除与队列关联的所有激活信息

alter queue dbo.BookstoreDistributionQueue
with activation 
(
	drop
)	
============================================*/   

use bookstore
go

declare @conversation_handler uniqueidentifier

declare @order_msg xml


begin dialog conversation @conversation_handler
from service [//SackConsulting/BookOrderService]
to service '//SackConsulting/BookStoreDistributionService'
on contract [//SackConsulting/BookOrderContract];


set @order_msg = 
	'<order id="1234" customer="22" orderdate="2012-10-01">
	<LineItem ItemNumber="1" ISBN="1-12345-123-0" Quantity="1" />
	</order>
	';

--send语句可以发送消息	
send on conversation @conversation_handler
message type [//SackConsulting/SendBookOrder]
(@order_msg);


--可以接收到一个消息,这个消息是对方在接受到下消息后,激活存储过程后,由存储过程回复的消息
select conversation_handle,
       CAST(message_body as xml)
from dbo.bookStoreQueue     
   


 实现远程服务器的Service Broker

前面的例子是同一个SQL Server实例的两个数据库,但大部分情况会将Service Broker设置为,使用在两个或多个SQL Server实例上的数据库。为了实现跨越服务器通信,可以通过Windows身份验证或基于证书的身份验证,启用传输安全模式、启用对话安全模式、创建路由、创建远程绑定。

--在实例1上
use master
go

if exists(select 1 from sys.databases where name = 'bookstore')
	drop database bookstore
else
	create database bookstore
go


alter database bookstore set enable_broker

alter database bookstore set trustworthy on


use bookstore
go

create message type [//BookConsulting/SendBookOrder]
validation = well_formed_xml
go

create message type [//BookConsulting/BookOrderReceived]
validation = well_formed_xml
go


create contract [//BookConsulting/BookOrderContract]
(
	[//BookConsulting/SendBookOrder] sent by initiator,
	[//BookConsulting/BookOrderReceived] sent by target
)
go



create queue BookStoreQueue
with status = on


create service [//BookConsulting/BookOrderService]
on queue dbo.BookStoreQueue
(
	[//BookConsulting/BookOrderContract]
)



--在实例2上
use master
go

if exists(select 1 from sys.databases where name = 'bookdistribution')
	drop database bookdistribution
else
	create database bookdistribution
go


alter database bookdistribution set enable_broker

alter database bookdistribution set trustworthy on


use bookdistribution
go

create message type [//BookConsulting/SendBookOrder]
validation = well_formed_xml
go

create message type [//BookConsulting/BookOrderReceived]
validation = well_formed_xml
go


create contract [//BookConsulting/BookOrderContract]
(
	[//BookConsulting/SendBookOrder] sent by initiator,
	[//BookConsulting/BookOrderReceived] sent by target
)
go



create queue BookDistributionQueue
with status = on


create service [//BookConsulting/BookDistributionService]
on queue dbo.BookDistributionQueue
(
	[//BookConsulting/BookOrderContract]
)




--启用传输安全模式,只是限制其他实例是否能访问本地服务器的端点

--实例1

use master 
go


--1.删除已经存在的数据库主密钥
drop master key


--2.创建数据库主密钥
create master key encryption by password = '123456!@#'


--3.创建证书
create certificate bookMasterCert
with subject = 'book Transport Security Service Broker',
	 expiry_date = '2012-12-31' 


--4.备份证书
backup certificate bookMasterCert
to file = 'c:\bookMasterCert.cer'
go


--5.创建端点
create endpoint service_broker_book_endpoint
state = started
as tcp (listener_port = 4020)
for service_broker (
						authentication = certificate bookMasterCert,
						encryption = required
                   )


--6.创建SQL Server的登录名
create login service_broker_login
with password = 'service_broker_login123'


--7.创建数据库用户名
create user service_broker_user
for login service_broker_login


--8.授予数据库用户可以连接端点
grant connect on endpoint::service_broker_book_endpoint 
              to service_broker_login


--9.通过另一个实例复制到本地服务器上的证书文件,来创建证书
create certificate bookDistributionMasterCert
authorization service_broker_user
from file = 'c:\bookDistributionMasterCert.cer'
go




--实例2

use master 
go


--删除数据库主密钥
drop master key

create master key encryption by password = '123456&^%'


create certificate bookDistributionMasterCert
with subject = 'bookDistribution Transport Security Service Broker',
	 expiry_date = '2012-12-31' 


backup certificate bookDistributionMasterCert
to file = 'c:\bookDistributionMasterCert.cer'


create endpoint service_broker_bookdistribution_endpoint
state = started
as tcp (listener_port = 4021)
for service_broker (
						authentication = certificate bookDistributionMasterCert,
						encryption = required
                   )


create login service_broker_login
with password = 'service_broker_login123'


create user service_broker_user
for login service_broker_login


grant connect on endpoint::service_broker_bookdistribution_endpoint 
              to service_broker_login


create certificate bookMasterCert
authorization service_broker_user
from file = 'c:\bookMasterCert.cer'
go




--启用对话安全模式
--实例1
use bookstore
go

--1.创建数据库主密钥
create master key encryption by password = '123456!@#'  


--2.创建证书,这里可以给当前数据库用户创建多个证书,不会有影响
--当接收到其他服务器传送过来的消息时,可以用这个证书来解密消息
create certificate BookStoreCert
with subject = 'BookStore service broker cert',
     expiry_date = '2012-12-31'


--3.备份证书
backup certificate bookstorecert
to file = 'c:\bookstorecert.cer'
go


--4.创建数据库用户,此用户只可以有一个证书
create user bookDistributionUser
without login
go


--5.通过从另一个实例复制过来的证书,来创建证书,并指定所有者为此用户
create certificate bookDistributionCert
authorization bookDistributionUser        --此用户只能拥有一个证书,
                                          --在发送消息时会用这个证书来加密消息
from file = 'c:\bookDistributionCert.cer'


--6.授予此用户名在某个服务上发送的权限
grant send on service::[//BookConsulting/BookOrderService] to bookDistributionUser
go


--7.创建路由
create route route_bookDistribution
with service_name = '//BookConsulting/BookDistributionService',
     address = 'tcp://192.168.1.16:4021'


--8.创建远程绑定
create remote service binding bookDistributionBinding
to service '//BookConsulting/BookDistributionService'
with user = bookDistributionUser


--9.开始会话,发送消息
declare @conversation_handler uniqueidentifier
declare @order_msg xml;


begin dialog conversation @conversation_handler
from service [//BookConsulting/BookOrderService]
to service '//BookConsulting/BookDistributionService'
on contract [//BookConsulting/BookOrderContract]


set @order_msg = 
	'<order id="1234" customer="22" orderdate="2012-10-01">
	<LineItem ItemNumber="1" ISBN="1-12345-123-0" Quantity="1" />
	</order>
	';


--send语句可以发送消息	
send on conversation @conversation_handler
message type [//BookConsulting/SendBookOrder]
(@order_msg);




--启用对话安全模式
--实例2
use bookdistribution
go

create master key encryption by password = '123456&^%'  


--当接收到对方发送的消息后,用此证书来解密
create certificate BookDistributionCert
with subject = 'BookDistribution service broker cert',
     expiry_date = '2012-12-31'


backup certificate bookDistributioncert
to file = 'c:\bookDistributioncert.cer'


create user bookStoreUser
without login


--在发送之前,用此证书来加密消息
create certificate bookStoreCert
authorization bookStoreUser
from file = 'c:\bookStoreCert.cer'


grant send on service::[//BookConsulting/BookDistributionService] to bookStoreUser


create route route_bookStore
with service_name = '//BookConsulting/BookOrderService',
     address = 'tcp://192.168.9.67:4020'


create remote service binding bookStoreBinding
to service '//BookConsulting/BookOrderService'
with user = bookStoreUser


--查询消息
SELECT *
FROM dbo.bookdistributionqueue  
        


事件通知
事件通知是集成到Service Broker的功能,这样可以在SQL Server实例中异步捕获SQL事件,将事件信息路由到特定的队列中。只需要最小的系统开销,就可以跟踪发生在SQL Server实例的事件,比如用户登录,存储过程重新编译,权限修改,对象处理(包括:对数据库、程序集、角色、表的create/alter/drop事件)。

使用事件通知只需要创建队列和Service Broker组件,在SQL Server中已经内建了,用来捕捉和发送事件通知的消息类型、约定。

IF NOT exists(select 1
              from sys.databases
              where name = 'EventTracking')
              
	create database EventTracking
else
	drop database EventTracking
	
go


use eventtracking
go


--1.创建队列
create queue SQLEventQueue
with status = on
go


--2.在队列上创建服务,关联到内建的事件通知约定
create service [//EventTracking/TrackLoginAlterService]
on queue SQLEventQueue
([http://schemas.microsoft.com/SQL/Notifications/PostEventNotification]);


--3.查询guid
select service_broker_guid --6AE234DF-C4BB-4C1B-9E08-F4EA66359B6A
from sys.databases
where name = 'EventTracking'


--4.创建server范围的事件通知,来跟踪SQL Server实例中所有登录名的创建,修改,删除
create event notification EVENT_LoginEvent
on server
for create_login,alter_login,drop_login
to service '//EventTracking/TrackLoginAlterService',
   '6AE234DF-C4BB-4C1B-9E08-F4EA66359B6A'   --service_broker_guid


--5.创建一个登录名,事件通知,把消息放入队列
create login login_ggg with password = '123456!@'


--6.查询消息
select CAST(message_body as xml) as event_Info
from dbo.sqleventqueue
原文地址:https://www.cnblogs.com/momogua/p/8304592.html