学习dbms_parallel_execute包

一、简介

  ORACLE11g R2版本的新特性之一就是引进了DBMS_PARALLEL_EXECUTE包,使用DBMS_PARALLEL_EXECUTE包批量并行递增式的更新表。

  更多ORACLE11g新特性请参考:http://www.cnblogs.com/oracle-dba/articles/3632223.html
基本原理
  (1) 把数据集分割成小的块(chunk),可基于rowid进行分块,也可根据指定范围的行数(rows)进行分块。
  (2) 在每一个块上以并行的方式应用update语句,在每个块执行完成后,立即提交,原理是通过调用JOB进行并发操作。
好处在于
  (1) 在执行update操作时,仅仅锁住一个chunk而非锁住整个表;
  (2) 因为对每个chunk 执行完毕就提交,所以当update操作失败后,之前变更的并不会回滚;
  (3) 减小回滚空间(undo)的使用;
  (4) 提高性能,可根据服务器的性能设置chunk_size与parallel_level的大小。
备注:parallel_level取决于CPU的个数,以及parallel_threads_per_cpu。

DBMS_PARALLEL_EXECUTE 使用三种将一个表的数据分割成多个chunk的方法:
  (1) CREATE_CHUNKS_BY_NUMBER_COL : 通过指定的字段来切割表
  (2) CREATE_CHUNKS_BY_ROWID : 通过ROWID来切割表,本文只介绍通过BY ROWID 进行分割
  (4) CREATE_CHUNKS_BY_SQL : 通过用户提供的sql语句来切割表

二、实践操作

  DBMS_PARALLEL_EXECUTE这个包操作起来比较简单,大体步骤为:

  (1)创建任务task,即调用create_task()过程;

  (2)创建分块规则,即调用create_chunk_by_rowid()或者create_chunk_by_number_col()等过程;

  (3)写动态update的sql语句,复杂和简单的均可,可以通过dbms_output.put_line()来验证动态sql的正确性;

  (4)运行task任务,即调用run_task()过程;

  (5)监控task任务,可通过视图user_parallel_execute_tasks和user_parallel_execute_chunks来监控task状态;

  (6)task成功运行完毕后,将任务删除,即调用drop_task()过程。

 2.1 准备工作

   给用户授权

    调用dbms_parallel_execute包时,需要有执行create job的权限,因此需要给用于赋予create job权限。

      grant create job to username;

   也可把grant execute on dbms_scheduler to username;赋予某用户。

 2.2 写代码

   实践操作代码,有改动,仅做参考。

CREATE OR REPLACE PROCEDURE Pro_Parallel_Exec_Update(i_Taskid IN VARCHAR2,
                                                       --任务ID  传入是序列 para_task_seq.nextval
                                                       i_Inputdate IN DATE,
                                                       --日期
                                                       i_Tabname IN VARCHAR2,
                                                       --更新表名
                                                       i_Column IN VARCHAR2,
                                                       --更新列名
                                                       i_Tasktp IN VARCHAR2 DEFAULT 'BY ROWID'
                                                       -- 任务类型 未启用的,可设置不能类型的 update
                                                       -- 如: 如不同类型的分块方法  by rowid , by number_col ,by sql
                                                       ) IS
/*******************************************************************
   PROC_NAME : 并行更新过程,本例采用BY ROWID 方法
   EDIT_NAME : zzd @2014-03-26
   REQUIREMENT :
            1) ORACLE 11g R2 版本
            2) DBMS_PARALLEL_EXECUTE
            3) OWN CREATE JOB PRIVILEGE  eg: GRANT EXECUTE ON DBMS_SCHEDULER TO USER ;
            4) SET JOB_QUEUE_PROCESSES
    REFERENCE:
            (1) USER_PARALLEL_EXECUTE_CHUNKS
            (2) USER_PARALLEL_EXECUTE_TASKS
            (3) DBMS_PARALLEL_EXECUTE.DROP_TASK(TASK_NAME);
    ATTACH :
           (1) 分块范围 Chunk_Size 和并行度 Paralle_Level 可以根据服务器性能自行设置
           (2) DBMS_PARALLEL_EXECUTE 包三种 (BY ROWID,  BY NUMBER_COL ,BY SQL ) 分块方法,原理差不多
           (3) 更多说明参考PLSQL Packages and Types Reference_11R2.pdf
   ********************************************************************/
   --- 任务名常量
   v_Taskname VARCHAR2(20) := 'UPDATE_TASK';
   --- schema常量
   v_Schema CONSTANT VARCHAR2(20) := 'USER';
   --- rowid范围常量
   v_Cksize CONSTANT INT := 8000;
   --- 并行度大小
   v_Plevel CONSTANT INT := 10;
   --- 日期变量
   v_Inputdate VARCHAR2(64);
   --- 临时动态sql 变量
   v_Sql_Stmt VARCHAR2(4000);
   --- 临时动态insert_sql 变量
   v_Sql_Insert VARCHAR2(4000);
   --- 控制尝试运行次数
   v_Trynum NUMBER;
   --- 返回运行状态
   v_Status NUMBER;
BEGIN
   v_Trynum := 0;
   --- 日期转换
   v_Inputdate := 'TO_DATE(''' || To_Char(i_Inputdate, 'YYYY-MM-DD') ||
                  ''',''' || 'YYYY-MM-DD' || ''')';
   --- 写日志开始更新
   Dbms_Output.Put_Line('开始运行 ' || ' USER = ' || v_Schema ||
                        ' Table_Name= ' || i_Tabname ||
                        ' Update_Column = ' || i_Column || ' 任务类型为: ' ||
                        i_Tasktp);
   --- 注意这里只能用实体表作为临时表,此处称“中间表”否则在创建task 这步会把临时表的数据情况,事务性和会话性临时表均不可以
   --- 清空 中间表数据
   EXECUTE IMMEDIATE ' Truncate TABLE User_Table_Temp '; -- 注这个是个实体中间表
   --- 动态 insert sql 语句
   v_Sql_Insert := ' INSERT INTO User_Table_Temp(Table_Name, Status)
      SELECT a.Table_Name, a.Status
        FROM User_Tables a
       WHERE a.Table_Name LIKE ' || '''' ||
                   ' USER_NAME% ' || '''';

   --- 执行动态sql
   EXECUTE IMMEDIATE v_Sql_Insert;
   --- 测试动态sql
   Dbms_Output.Put_Line(v_Sql_Insert);

   ---  创建任务
   v_Taskname := v_Taskname || ' _ ' || i_Taskid;
   Dbms_Parallel_Execute.Create_Task(v_Taskname);

   --- 使用 ROWID 进行范围分组
   Dbms_Parallel_Execute.Create_Chunks_By_Rowid(Task_Name => v_Taskname,
                                                Table_Owner => Upper(v_Schema),
                                                Table_Name => Upper(i_Tabname),
                                                By_Row => TRUE,
                                                Chunk_Size => v_Cksize);

   --- 动态update的sql 脚本
   v_Sql_Stmt := ' UPDATE /*+ ROWID (dda) */  ' || Upper(i_Tabname) || ' b 
              SET b.' || i_Column;

   v_Sql_Stmt := v_Sql_Stmt || ' = (SELECT a.TABLE_NAME|| A.STATUS
                FROM User_Table_Temp a
               WHERE b. Object_Name' ||
                 ' = a.TABLE_NAME
                 AND b.Created = ' || v_Inputdate || ') ';

   v_Sql_Stmt := v_Sql_Stmt || '
       WHERE EXISTS (SELECT 1
                FROM User_Table_Temp a
               WHERE b.Object_Name ' ||
                 ' = a.TABLE_NAME
                 AND b.Created = ' || v_Inputdate || ')';

   v_Sql_Stmt := v_Sql_Stmt || Chr(10) ||
                 'AND ROWID BETWEEN :Start_Id AND :End_Id ';

   --- 测试动态SQL
   Dbms_Output.Put_Line(v_Sql_Stmt);
   --- 执行并行更新
   Dbms_Parallel_Execute.Run_Task(Task_Name => v_Taskname,
                                  Sql_Stmt => v_Sql_Stmt,
                                  Language_Flag => Dbms_Sql.Native,
                                  Parallel_Level => v_Plevel);
   --- 更新结束
   Dbms_Output.Put_Line('完成运行 ' || ' USER = ' || v_Schema ||
                        ' Table_Name=  ' || i_Tabname ||
                        '  Update_Column = ' || i_Column || ' 任务类型为: ' ||
                        i_Tasktp);

   --- 如果 task任务出差,则恢复它,并重新执行,如此进行尝试两次
   v_Status := Dbms_Parallel_Execute.Task_Status(v_Taskname);

   WHILE (v_Trynum < 2 AND v_Status != Dbms_Parallel_Execute.Finished) LOOP
      v_Trynum := v_Trynum + 1;
      --- 恢复任务,尝试继续执行
      Dbms_Parallel_Execute.Resume_Task(v_Taskname);
      --- 获取当前任务状态
      v_Status := Dbms_Parallel_Execute.Task_Status(v_Taskname);
   
   END LOOP;
   -- 更新完毕后,删除此任务
   Dbms_Parallel_Execute.Drop_Task(v_Taskname);

END Pro_Parallel_Exec_Update;

 三、相关视图

  (1)USER_PARALLEL_EXECUTE_CHUNKS

  (2)USER_PARALLEL_EXECUTE_TASKS

原文地址:https://www.cnblogs.com/oracle-dba/p/3632361.html