SSIS+CDC 增量抽取数据

一 、建立cdc记录表用于每次增量的时间节点

    create table dbo.cdc_capture_log (
    cdc_capture_log_id int identity not null
    , capture_instance nvarchar(50) not null
    , start_time datetime not null
    , min_lsn binary(10) not null
    , max_lsn binary(10) not null
    , end_time datetime null
    , status_code int not null default 0)


capture_instance:資料表開啟CDC所指定的值。
start_time、end_time:紀錄執行所花的時間。
min_lsn、max_lsn:表示CDC記錄每次更改LSN的範圍。
status_code:當SSIS成功完成時,status_code=1。


二、create procedure dbo.usp_init_cdc_capture_log

    create procedure dbo.usp_init_cdc_capture_log
    @capture_instance nvarchar(50)
    as
    begin
    set nocount on;
    declare
    @start_lsn binary(10)
    , @end_lsn binary(10)
    , @prev_max_lsn binary(10)
    --get the max LSN for the capture instance from
    --the last extract
    select @prev_max_lsn = max(max_lsn)
    from dbo.cdc_capture_log
    where capture_instance = @capture_instance
    -- if no row found in cdc_capture_log get the min lsn
    -- for the capture instance
    if @prev_max_lsn is null
    set @start_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)
    else
    set @start_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn)
    -- get the max lsn
    set @end_lsn = sys.fn_cdc_get_max_lsn()
    insert into dbo.cdc_capture_log
    (capture_instance,start_time,min_lsn,max_lsn)
    values
    (@capture_instance,getdate(),@start_lsn,@end_lsn)
    select cast(scope_identity() as int) cdc_capture_log_id
    end

該SP幫助我們取得上次所執行的max_lsn,如果有找到資料就呼叫sys.fn.cdc_increment_lsn並設定@start_lsn,
否則就呼叫sys.fn_cdc_get_min_lsn並取得lsn,最後會返回cdc_capture_log_id,
後續將利用cdc_capture_log_id來執行相關資料更新。


create procedure dbo.usp_end_cdc_capture_log

    create procedure dbo.usp_end_cdc_capture_log
    @capture_log_id int
    as
    begin
    set nocount on;
    update dbo.cdc_capture_log set
    end_time = getdate(),
    status_code = 1
    where cdc_capture_log_id = @cdc_capture_log_id
    end

該SP幫助我們更新cdc_capture_log資料表的結束時間和狀態。

create procedure dbo.usp_extract_userm_capture_log

    create procedure dbo.usp_extract_userm_capture_log
    @capture_log_id int
    as
    begin
    set nocount on;
    declare
    @start_lsn binary(10)
    ,@end_lsn binary(10)
    -- get the lsn range to process
    select
    @start_lsn = min_lsn
    ,@end_lsn = max_lsn
    from dbo.cdc_capture_log
    where cdc_capture_log_id = @cdc_capture_log_id
    -- extract and return the changes
    select m.tran_end_time modified_ts, x.*
    from cdc.fn_cdc_get_net_changes_dbo_userm (
    @start_lsn, @end_lsn, 'all'
    ) x
    join cdc.lsn_time_mapping m
    on m.start_lsn = x.__$start_lsn ;
    end

該SP幫助我們取得lsn範圍(透過@cdc_capture_log_id查詢cdc_capture_log資料表),
並透過呼叫cdc.fn_cdc_get_net_changes_dbo_userm和cdc.lsn_time_mapping來取得lsn範圍中發生的所有資料變更。

三、設計SSIS控制流程和資料流程

編輯Exec usp_init_cdc_capture_log

创建全局变量

編輯資料流程

編輯OLE DB來源


編輯條件式分割


 

編輯update(oledb命令)

編輯insert


最后就是测试,在数据源做添删改,运行包,查看目标数据是否同步。

原文地址:https://www.cnblogs.com/purple5252/p/10335110.html