postgresql 比较两个表数据是否一致的函数

最近开发有个小需求,就是如何比较两个数据库里指定表的数据是否一致。
自己就写了个简单函数,供大家参考。

CREATE OR REPLACE FUNCTION public.f_compare_tabledata(
    pi_localtablename character varying, 
    pi_localtable_excludecolumn character varying, 
    pi_remotetablename character varying, 
    pi_remotetable_excludecolumn character varying, 
    OUT po_result character varying
)
 RETURNS character varying
 LANGUAGE plpgsql
AS $function$
DECLARE
    lv_colstr_orderby_local   varchar;
    lv_colstr_orderby_remote  varchar;
    lv_colstr_select_local    varchar;
    lv_colstr_select_remote   varchar;

    lv_sql_insert text;
    lv_sql_update text;
begin

     /* 
     create table tmp_compare_tabledata_repeat(
         local_tablerepeat  varchar(100),
         remote_tablerepeat varchar(100)
     );

     create table tmp_compare_tabledata_result(
         id int8,
         local_tablename  varchar(100),
         local_tabledata  text,
         remote_tablename varchar(100),
         remote_tabledata text,
         eq_flag          varchar(10)
     );

     create table tmp_compare_tabledata_result2(
         local_id text,
         local_tablename  varchar(100),
         local_tabledata  text,
         remote_id text,
         remote_tablename varchar(100),
         remote_tabledata text,
         eq_flag          varchar(10)
     );

     */
     truncate table tmp_compare_tabledata_repeat;
     truncate table tmp_compare_tabledata_result;
     truncate table tmp_compare_tabledata_result2;

     drop sequence if exists public.seq_compare_localdata;
     CREATE SEQUENCE public.seq_compare_localdata
     INCREMENT BY 1
     MINVALUE 1
     MAXVALUE 9223372036854775807
     START 1; 

     drop sequence if exists public.seq_compare_remotedata;
     CREATE SEQUENCE public.seq_compare_remotedata
     INCREMENT BY 1
     MINVALUE 1
     MAXVALUE 9223372036854775807
     START 1;

     --获取local tablename 的列拼接
     select string_agg(case when t0.typname in ('bpchar','varchar')      then 'coalesce('||t0.attname||','''')'
                            when t0.typname in ('int4','int8','numeric') then 'coalesce('||t0.attname||'::varchar,'''')'
                            when t0.typname in ('timestamp')             then 'coalesce('||t0.attname||',''1990-01-01 00:00:01''::timestamp)'
                            else t0.attname
                        end ,      
                       '||''##$$##''||')      as colstr_select_local,         
            string_agg(t0.attname,',')        as colstr_orderby_local
       into lv_colstr_select_local,
            lv_colstr_orderby_local
       from ( select pc.attname,pc.attnum,pt.typname
                from pg_attribute pc
                     left outer join pg_type pt
                                  on pc.atttypid = pt.oid
               where 1=1
                 and pc.attrelid=pi_localtablename::regclass --'t_acktradeblotter_t'     
                 and pc.attnum >= 1 
                 and pc.attname not in ( select trim( both ' ' from regexp_split_to_table(coalesce(pi_localtable_excludecolumn,''),',')) )
               order by pc.attnum
             ) t0
      ;


      --获取remote tablename 的列拼接
      with tmp_t0 as (
         select * 
           from dblink('dbname=peiybdb host=127.0.0.1 port=5432 user=peiyb password=peiybpeiyb',
                       '  select string_agg(case when t0.typname in (''bpchar'',''varchar'')        then ''coalesce(''||t0.attname||'','''''''''''''''')''
                                                 when t0.typname in (''int4'',''int8'',''numeric'') then ''coalesce(''||t0.attname||''::varchar,'''''''''''''''')''
                                                 when t0.typname in (''timestamp'')                 then ''coalesce(''||t0.attname||'',''''''''1990-01-01 00:00:01''''''''::timestamp)''
                                                 else t0.attname
                                             end ,      
                                             ''||''''''''##$$##''''''''||'')  as colstr_select_remote,           
                                 string_agg(t0.attname,'','')         as colstr_orderby_remote
                            from ( select pc.attname,pc.attnum,pt.typname
                                    from pg_attribute pc
                                         left outer join pg_type pt
                                                      on pc.atttypid = pt.oid
                                   where 1=1
                                     and pc.attrelid='''||pi_remotetablename||'''::regclass
                                     and pc.attnum >= 1 
                                     and pc.attname not in ( select trim( both '' '' from regexp_split_to_table(coalesce('''||coalesce(pi_remotetable_excludecolumn,'')||''',''''),'','')) ) 
                                   order by pc.attnum
                                 ) t0         
                       ') as t0 (colstr_select_remote  varchar,
                                 colstr_orderby_remote varchar
                                )  
      )
       select colstr_select_remote,
              colstr_orderby_remote
         into lv_colstr_select_remote,
              lv_colstr_orderby_remote
         from tmp_t0
         ;


      lv_sql_insert:='
          insert into tmp_compare_tabledata_result
          (
             id ,
             local_tablename ,
             local_tabledata ,
             remote_tablename ,
             remote_tabledata 
          )
          with tmp_local as (
             select nextval(''public.seq_compare_localdata''::regclass) as id,
                    '''||pi_localtablename||'''::varchar as local_tablename,
                    t0.tabdata::text as local_tabledata
               from (
                  select '||lv_colstr_select_local||' as tabdata
                    from '||pi_localtablename||'
                   order by '||lv_colstr_orderby_local||' 
                     ) t0
          ),tmp_remote as (
             select nextval(''public.seq_compare_remotedata''::regclass) as id,
                    '''||pi_remotetablename||'''::varchar as remote_tablename,
                    t1.tabdata::text as remote_tabledata
               from (
                 select * 
                   from dblink(''dbname=peiybdb host=127.0.0.1 port=5432 user=peiyb password=peiybpeiyb'',
                               '' select '||lv_colstr_select_remote||' as tabdata
                                    from '||pi_remotetablename||'
                                   order by '||lv_colstr_orderby_remote||'
                               '') as t1 (tabdata text)
                     ) t1   
          )
          select p0.id,
                 max(p0.local_tablename)  as local_tablename,
                 max(p0.local_tabledata)  as local_tabledata,
                 max(p0.remote_tablename) as remote_tablename,
                 max(p0.remote_tabledata) as remote_tabledata
            from ( 
                  select tc.id,
                         tc.local_tablename,
                         tc.local_tabledata,
                         ''''::varchar as remote_tablename,
                         ''''::text    as remote_tabledata
                    from tmp_local tc
                  union all
                  select tr.id,
                         ''''::varchar as local_tablename,
                         ''''::text    as local_tabledata,
                         tr.remote_tablename,
                         tr.remote_tabledata
                    from tmp_remote tr
                  ) p0
           group by p0.id
           order by p0.id
      ';

      raise notice 'f_compare_tabledata,%',lv_sql_insert;

      execute lv_sql_insert;


      insert into tmp_compare_tabledata_repeat(
         local_tablerepeat  ,
         remote_tablerepeat 
      )
      with tmp_t0 as (
          select t0.local_tabledata,
                 count(1) as cnt
            from tmp_compare_tabledata_result t0
           where 1=1
             and (  t0.local_tabledata is not null 
                and t0.local_tabledata <> ''
                  )
           group by t0.local_tabledata
          having count(1) > 1
      ),tmp_t1 as (
          select t1.remote_tabledata,
                 count(1) as cnt
            from tmp_compare_tabledata_result t1
           where 1=1
             and (  t1.remote_tabledata is not null 
                and t1.remote_tabledata <> ''
                  ) 
           group by t1.remote_tabledata
          having count(1) > 1
      ) 
      select coalesce(pi_localtablename,'')||' :重复值有 '||coalesce((select count(cnt)::varchar from tmp_t0),'0'),
             coalesce(pi_remotetablename,'')||' :重复值有 '||coalesce((select count(cnt)::varchar from tmp_t1),'0')
      ;


      insert into tmp_compare_tabledata_result2(
        local_tabledata,
        remote_tabledata,

        local_id,
        local_tablename,

        remote_id,
        remote_tablename
      )
      select t0.local_tabledata,
             t1.remote_tabledata,

             string_agg(t0.id::varchar,',')    as local_id,
             max(t0.local_tablename)  as local_tablename,

             string_agg(t1.id::varchar,',')    as remote_id,
             max(t1.remote_tablename) as remote_tablename

        from ( select * from tmp_compare_tabledata_result where local_tablename is not null and local_tablename <> '') t0
             full outer join (select * from tmp_compare_tabledata_result where remote_tabledata is not null and remote_tablename <> '') t1 
                          on t0.local_tabledata = t1.remote_tabledata
       where 1=1 
       group by t0.local_tabledata,
                t1.remote_tabledata
      ;

      update tmp_compare_tabledata_result2
           set eq_flag = case when local_tabledata = remote_tabledata then 'eq'
                              else 'neq'
                          end  
      ;

      --po_result=lv_sql_insert;
      --return ;

      po_result=lv_sql_insert;
      po_result='sucess: 
select * from tmp_compare_tabledata_repeat; --查看重复值情况
select local_tabledata,count(1)   from tmp_compare_tabledata_result where local_tablename is not null  and local_tablename <> ''  group by local_tabledata  having count(1) >1 --查看本地表重复值
select remote_tabledata,count(1)  from tmp_compare_tabledata_result where remote_tablename is not null and remote_tablename <> '' group by remote_tabledata having count(1) >1 --查看远程表重复值
select * from tmp_compare_tabledata_result2 where eq_flag = ''neq'' order by local_tabledata,remote_tabledata -- 查看不相等的数据
';
      return ;
END;
$function$
原文地址:https://www.cnblogs.com/ctypyb2002/p/9792949.html