使用MapReduce将HDFS数据导入到HBase(三)

使用MapReduce生成HFile文件,通过BulkLoader方式(跳过WAL验证)批量加载到HBase表中

package com.mengyao.bigdata.hbase;

import java.io.IOException;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 
 * @author mengyao
* HBase-1.0.1.1、Hadoop-2.6.0 *
*/ public class BulkLoadApp { private static Configuration conf = HBaseConfiguration.create(); private static String inPath; private static String outPath; private static String tableName; static { conf.set("hbase.zookeeper.quorum", "bdata200,bdata202,bdata203"); conf.set("hbase.zookeeper.property.clientPort", "2181"); } static class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private ImmutableBytesWritable row; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //id,username,email,birthday,mobile,phone,modified String[] fields = line.split(" "); String id = fields[0]; String username = fields[1]; String mail = fields[2]; String birthday = fields[3]; String mobile = fields[4]; String phone = fields[5]; String regtime = fields[6]; String rowKey = DigestUtils.md5Hex(id); row = new ImmutableBytesWritable(Bytes.toBytes(rowKey)); Put put = new Put(Bytes.toBytes(rowKey), System.currentTimeMillis()); if (!StringUtils.isEmpty(id)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("id"), Bytes.toBytes(id)); } if (!StringUtils.isEmpty(username)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("username"), Bytes.toBytes(username)); } if (!StringUtils.isEmpty(mail)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mail"), Bytes.toBytes(mail)); } if (!StringUtils.isEmpty(birthday) || !birthday.equals("0000-00-00")) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes(birthday)); } if (!StringUtils.isEmpty(mobile)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mobile"), Bytes.toBytes(mobile)); } if (!StringUtils.isEmpty(phone)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(phone)); } if (!StringUtils.isEmpty(regtime)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("modified"), Bytes.toBytes(regtime)); } context.write(row, put); } } static int createJob(String[] args) throws Exception { inPath = args[0]; outPath = args[1]; tableName = args[2]; Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(tableName)); Job job=Job.getInstance(conf); job.setJarByClass(BulkLoadApp.class); job.setMapperClass(BulkLoadMapper.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setOutputFormatClass(HFileOutputFormat2.class); HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf(tableName))); FileInputFormat.addInputPath(job,new Path(inPath)); FileOutputFormat.setOutputPath(job,new Path(outPath)); return job.waitForCompletion(true)?0:1; } /** * use commond: * 1、hadoop jar MyJar INPUT_FILE OUTPUT_DIR TABLE_NAME * hadoop jar bigdata.jar /tag/data/user/haier_user.csv /tag/data/user/haier_user_out tbl_shopuser * 2、hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles OUTPUT_DIR TABLE_NAME * hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tag/data/user/haier_user_out tbl_shopuser * @param args * @throws Exception */ @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { if (args.length!=3) { System.out.println("Usage: "+BulkLoadApp.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH> <TABLE_NAME>"); } else { int status = createJob(args); if (status == 0) { LoadIncrementalHFiles loadHFiles = new LoadIncrementalHFiles(conf); loadHFiles.doBulkLoad(new Path(outPath), new HTable(conf, Bytes.toBytes(tableName))); } System.exit(status); } } }
原文地址:https://www.cnblogs.com/mengyao/p/6774046.html