Sqoop-1.4.6 Merge源码分析与改造使其支持多个merge-key

  Sqoop中提供了一个用于合并数据集的工具sqoop-merge。官方文档中的描述可以参考我的另一篇博客Sqoop-1.4.5用户手册
  Merge的基本原理是,需要指定新数据集和老数据集的路径,根据某个merge-key,在reduce过程中,优先取出新数据集中的数据,共同合并成新的全量数据。具体的逻辑分析可以稍后通过看Sqoop-1.4.6的源码来进一步了解。
  但是,在原生的Sqoop中,目前只支持merge-key为一个字段的情况,本文通过分析源代码并对源代码进行更改,可以在使用Sqoop的Merge功能时支持任意多个merge-key。

一、Sqoop Merge Tool使用示例

  在这里模拟一次数据增量同步到hive中的过程。

1、数据准备

  有一张分区表sqoop_merge_all,分区字段pt,在hdfs上的文件存储路径为:hdfs://m000/user/hive/warehouse/sqoop_merge_all,表结构如下:

字段 类型
id int
type string
comments string
updatetime string

  另外有一张增量表sqoop_merge_inc,该表的hdfs路径为hdfs://m000/user/hive/warehouse/sqoop_merge_inc
  看一下这两张表中的数据:
sqoop_merge_all (pt=20160810)
  这里写图片描述
假设这里存的是20160810的全量数据。
sqoop_merge_inc
  这里写图片描述
假设20160811当天,对type=type1的记录进行了更新,并且新增了一条type=type3的记录。
  现在,需要把sqoop_merge_inc中的两条记录与sqoop_merge_all分区 (pt=20160810)中的两条记录进行合并,合并后的数据存入sqoop_merge_all的分区 (pt=20160811)
  按照正常逻辑,id=1的那条记录被更新成id=3的那一条,id=4的记录新增,那么最终sqoop_merge_all的分区 (pt=20160811)中应该包含的记录分别为id=2,id=3,id=4这三条。

2、Sqoop Merge操作

  完整的sqoop merge命令如下:

sqoop merge 
  --new-data hdfs://m000/user/hive/warehouse/sqoop_merge_inc 
  --onto hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160810 
  --target-dir hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160811 
  --jar-file /usr/local/sqoop/bindir/sqoop_merge.jar 
  --class-name sqoop_merge 
  --merge-key type

  简单说明一下,上面这句命令中表示,将新的数据集(参数--new-data)与已有的数据集(参数--onto)进行合并,合并后的数据存入(参数--target-dir)路径下。需要指定这次合并中使用的表的结构jar包和class。根据type字段进行合并。
  最终结果如下,与上面期望的相一致。
  这里写图片描述

  我们看一眼sqoop_merge.jar以及sqoop_merge.class的内容,基本上可以理解成这个类是sqoop_merge表的一个java bean定义。
  这里写图片描述

二、Sqoop Merge Tool实现原理

  那么,Sqoop是如何实现上面这个功能的呢?

1、脚本分析

  在$SQOOP_HOME/bin路径下,有一个sqoop-merge脚本。
(1)sqoop-merge
  该脚本主要逻辑就是调用了sqoop merge命令,并把类似于--onto之类的参数都传入。

prgm=`readlink -f $0`
bin=`dirname ${prgm}`
bin=`cd ${bin} && pwd`

exec ${bin}/sqoop merge "$@" 

(2)sqoop
  该脚本中,对参数进行处理后,最终执行下面这一句。把merge也一并传入。

source ${bin}/configure-sqoop "${bin}"
exec ${HADOOP_COMMON_HOME}/bin/hadoop org.apache.sqoop.Sqoop "$@"

2、Java源代码

  查看Sqoop源代码,可以看到Sqoop是使用Java语言实现的。我们首先找到上面脚本中使用的类。由于每个类中的方法调用过程比较麻烦,接下来只分析主要代码,首先,完整的调用链如下所示。

org.apache.sqoop.Sqoop#main
--> org.apache.sqoop.Sqoop#runTool(args) 
--> org.apache.sqoop.Sqoop#runTool(args, new Configuration())
--> org.apache.sqoop.Sqoop#runSqoop(Sqoop sqoop, args[1...end])
--> org.apache.hadoop.util.ToolRunner.run()
--> tool.run()

  最后这个tool的类型会详细分析。
  
(1)org.apache.sqoop.Sqoop
  这个类是整个调用链的入口,这个类中主要方法的逻辑如下。
  在第二个runTool方法调用处,会根据传入的第一个参数(即我们传入的”merge”),生成一个SqoopTool类型的tool对象。

  public static int runTool(String [] args, Configuration conf) {
      ...
      String toolName = expandedArgs[0];
      Configuration pluginConf = SqoopTool.loadPlugins(conf);
      SqoopTool tool = SqoopTool.getTool(toolName);
      ...
      Sqoop sqoop = new Sqoop(tool, pluginConf);
      // 除去"merge"参数之外的其他参数,一起传入runSqoop中
      return runSqoop(sqoop, Arrays.copyOfRange(expandedArgs, 1, expandedArgs.length));
  }

  那么这个tool对象到底是个什么呢?跟踪进入com.cloudera.sqoop.tool#getTool方法,进入org.apache.sqoop.tool.SqoopTool#getTool方法。

(2)org.apache.sqoop.tool.SqoopTool
  这里面,是从一个名为TOOLS的Map中,根据toolName获取对应的类对象。从下面代码中我们可以看到Sqoop支持的所有Tool,并且merge对应的Tool是MergeTool类型的。

private static final Map<String, Class<? extends SqoopTool>> TOOLS;
TOOLS = new TreeMap<String, Class<? extends SqoopTool>>();
...
// registerTool方法最终都会向TOOLS中put一个对象
registerTool("codegen", CodeGenTool.class,
    "Generate code to interact with database records");
registerTool("create-hive-table", CreateHiveTableTool.class,
    "Import a table definition into Hive");
registerTool("eval", EvalSqlTool.class,
    "Evaluate a SQL statement and display the results");
registerTool("export", ExportTool.class,
    "Export an HDFS directory to a database table");
registerTool("import", ImportTool.class,
    "Import a table from a database to HDFS");
registerTool("import-all-tables", ImportAllTablesTool.class,
    "Import tables from a database to HDFS");
registerTool("import-mainframe", MainframeImportTool.class,
        "Import datasets from a mainframe server to HDFS");
registerTool("help", HelpTool.class, "List available commands");
registerTool("list-databases", ListDatabasesTool.class,
    "List available databases on a server");
registerTool("list-tables", ListTablesTool.class,
    "List available tables in a database");
registerTool("merge", MergeTool.class,
    "Merge results of incremental imports");
registerTool("metastore", MetastoreTool.class,
    "Run a standalone Sqoop metastore");
registerTool("job", JobTool.class,
    "Work with saved jobs");
registerTool("version", VersionTool.class,
    "Display version information");

  最前面那个调用链最后那个tool对象,对应的就是MergeTool类型。那么接下来我们进入到MergeTool#run中。

(3)org.apache.sqoop.tool.MergeTool
  在这个方法中,生成一个MergeJob对象,然后通过该mergeJob的runMergeJob方法,运行一个MapReduce任务。

  public int run(SqoopOptions options) {
    try {
      // Configure and execute a MapReduce job to merge these datasets.
      MergeJob mergeJob = new MergeJob(options);
      if (!mergeJob.runMergeJob()) {
        LOG.error("MapReduce job failed!");
        return 1;
      }
    } catch (IOException ioe) {
      ...
    }
    return 0;
  }

(4)org.apache.sqoop.mapreduce.MergeJob
  这个类的runMergeJob方法是一个标准的MapReduce程序。我们主要跟踪其Mapper类和Reducer类。

  public boolean runMergeJob() throws IOException {
      ...
      if (ExportJobBase.isSequenceFiles(jobConf, newPath)) {
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setMapperClass(MergeRecordMapper.class);
      } else {
        job.setMapperClass(MergeTextMapper.class);
        job.setOutputFormatClass(RawKeyTextOutputFormat.class);
      }
      ...
      job.setReducerClass(MergeReducer.class);
  }

  使用的Reducer类是MergeReducer,根据文件类型分别生成MergeRecordMapperMergeTextMapper类型的Mapper类。但是,不管表文件类型是什么,这两个Mapper类,最终共同继承了MergeMapperBase类,并且在各自的map方法中,调用了MergeMapperBase#processRecord方法,map阶段的主要逻辑也就在该方法中。

(5)org.apache.sqoop.mapreduce.MergeMapperBase
  这里我们只分析processRecord方法。
  在这个方法中我们看到,每一条记录对应一个MergeRecord对象,这个对象最后会在map的输出中输出到reduce阶段。fieldMap是一个Map类型,其key就是我们通过参数--merge-key指定的字段,根据该字段名称,从fieldMap中取出当前记录该字段的值,转化成String后,当作map的输出,与MergeRecord对象一起交给Reduce来处理。

  protected void processRecord(SqoopRecord r, Context c)
      throws IOException, InterruptedException {
    MergeRecord mr = new MergeRecord(r, isNew);
    Map<String, Object> fieldMap = r.getFieldMap();
    if (null == fieldMap) {
      throw new IOException("No field map in record " + r);
    }
    Object keyObj = fieldMap.get(keyColName);
    if (null == keyObj) {
      throw new IOException("Cannot join values on null key. "
          + "Did you specify a key column that exists?");
    } else {
      c.write(new Text(keyObj.toString()), mr);
    }
  }

(6)org.apache.sqoop.mapreduce.MergeReducer
  这个类中,reduce方法的逻辑如下。  
  取出MergeRecord集合中相同key的所有记录,如果新数据集中不包含当前字段值的记录,则从旧的数据集中取该条记录。如果新旧数据集中都有该记录,则从新的数据集中取出该记录。

  public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
      throws IOException, InterruptedException {
    SqoopRecord bestRecord = null;
    try {
      for (MergeRecord val : vals) {
        if (null == bestRecord && !val.isNewRecord()) {
          // Use an old record if we don't have a new record.
          bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
        } else if (val.isNewRecord()) {
          bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
        }
      }
    } catch (CloneNotSupportedException cnse) {
      throw new IOException(cnse);
    }

    if (null != bestRecord) {
      c.write(bestRecord, NullWritable.get());
    }
  }

三、Sqoop Merge Tool源码修改

  从上面源代码过程分析可以看到,merge过程只能指定一个字段,如果指定多个字段时,会报如下的错,提示当前指定的字段不存在。

16/08/22 15:54:15 INFO mapreduce.Job: Task Id : attempt_1470135750174_2508_m_000004_2, Status : FAILED
Error: java.io.IOException: Cannot join values on null key. Did you specify a key column that exists?
    at org.apache.sqoop.mapreduce.MergeMapperBase.processRecord(MergeMapperBase.java:79)
    at org.apache.sqoop.mapreduce.MergeTextMapper.map(MergeTextMapper.java:58)
    at org.apache.sqoop.mapreduce.MergeTextMapper.map(MergeTextMapper.java:34)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

  并且我们发现使用merge key对记录进行合并主要发生在map阶段,所以如果需要支持多个字段的merge时我们只需要修改MergeMapperBase#processRecord方法即可。修改后的代码如下
  在使用sqoop merge时,多个字段用逗号分隔,把每个字段对应的值取出来拼接成新的key。

    protected void processRecord(SqoopRecord r, Context c)
            throws IOException, InterruptedException {
        MergeRecord mr = new MergeRecord(r, isNew);
        Map<String, Object> fieldMap = r.getFieldMap();
        if (null == fieldMap) {
            throw new IOException("No field map in record " + r);
        }
        Object keyObj = null;
        if (keyColName.contains(",")) {
            String connectStr = new String(new byte[]{1});
            StringBuilder keyFieldsSb = new StringBuilder();
            for (String str : keyColName.split(",")) {
                keyFieldsSb.append(connectStr).append(fieldMap.get(str).toString());
            }
            keyObj = keyFieldsSb;
        } else {
            keyObj = fieldMap.get(keyColName);
        }

        if (null == keyObj) {
            throw new IOException("Cannot join values on null key. "
                    + "Did you specify a key column that exists?");
        } else {
            c.write(new Text(keyObj.toString()), mr);
        }
    }

  上面需要注意的一点是,我的拼接符使用了一个byte的String,这样可以避免以下这种情况。
  假设使用“+”当拼接符,如果存在两条记录:

Field a Field b
a+ b
a +b

  使用字段a,b进行merge时,上面两条不一样的记录最终会被程序认为是相同的,由此会产生新的数据不准确问题。
  
  有关该问题的更多信息可以参考[SQOOP-3002]

四、多字段的merge

  还是以上面两张表为例进行测试,表sqoop_merge_all使用两个新的分区pt=20160801,pt=20160802
  sqoop_merge_all (pt=20160801)数据
  这里写图片描述
  sqoop_merge_inc数据
  这里写图片描述
  指定--merge-key type,comments进行merge,理论上只有id=1的那一条记录被更新成id=5的那一条。合并后的数据应该包含id=2,id=3,id=4,id=5这四条记录。

sqoop merge 
  --new-data hdfs://m000/user/hive/warehouse/sqoop_merge_inc 
  --onto hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160801 
  --target-dir hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160802 
  --jar-file /usr/local/sqoop/bindir/sqoop_merge.jar 
  --class-name sqoop_merge 
  --merge-key type,comments

  merge后,最终sqoop_merge_all (pt=20160802)的数据为:
  这里写图片描述

原文地址:https://www.cnblogs.com/wuyida/p/6300229.html