hive不支持多个字符作为分隔符的解决方案

题记:

  近期在做某个大型银行的大数据项目,当在处理非结构化数据时,却发现他们给的数据并不符合hive和pig的处理要求,数据每行必须需要多个分割符才能完美处理,一下午也没有想到完美的办法解决,今天重新审视了一下整个过程。看来hive的命令行没法搞定了。于是乎,只能通过代码来搞定。

1、重新实现hive的InputFormat了,别急放码过来

package hiveStream;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;

public class MyHiveInputFormat  extends TextInputFormat implements JobConfigurable{

	@Override
	public RecordReader<LongWritable, Text> getRecordReader(
			InputSplit genericSplit, JobConf job, Reporter reporter)
			throws IOException {
		 reporter.setStatus(genericSplit.toString());
	       return new MyRecordReader((FileSplit) genericSplit, job);
	}
	
}

2、仔细看看下面的方法,不解释,自己领悟。

package hiveStream;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.LineReader;


public class MyRecordReader implements RecordReader<LongWritable, Text>{
	
	private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader lineReader;
    int maxLineLength;
	
    public MyRecordReader(InputStream in, long offset, long endOffset,
            int maxLineLength) {
        this.maxLineLength = maxLineLength;
        this.start = offset;
        this.lineReader = new LineReader(in);
        this.pos = offset;
        this.end = endOffset;
    }
    
    public MyRecordReader(InputStream in, long offset, long endOffset,
            Configuration job) throws IOException {
        this.maxLineLength = job.getInt(
                "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
        this.lineReader = new LineReader(in, job);
        this.start = offset;
        this.end = endOffset;
    }
    
    // 构造方法
    public MyRecordReader(FileSplit inputSplit, Configuration job)
            throws IOException {
        maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
                Integer.MAX_VALUE);
        start = inputSplit.getStart();
        end = start + inputSplit.getLength();
        final Path file = inputSplit.getPath();
        // 创建压缩器
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);
        // 打开文件系统
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(file);
        boolean skipFirstLine = false;
 
        if (codec != null) {
            lineReader = new LineReader(codec.createInputStream(fileIn), job);
            end = Long.MAX_VALUE;
        } else {
            if (start != 0) {
                skipFirstLine = true;
                --start;
                fileIn.seek(start);
            }
            lineReader = new LineReader(fileIn, job);
        }
 
        if (skipFirstLine) {
            start += lineReader.readLine(new Text(), 0,
                    (int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }
	
    @Override
	public void close() throws IOException {
    	if (lineReader != null)
            lineReader.close();
	}

	@Override
	public LongWritable createKey() {
		return new LongWritable();
	}

	@Override
	public Text createValue() {
		 return new Text();
	}

	@Override
	public long getPos() throws IOException {
		  return pos;
	}

	@Override
	public float getProgress() throws IOException {
	   if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - start) / (float) (end - start));
        }
	}

	@Override
	public boolean next(LongWritable key, Text value) throws IOException {
		 while (pos < end) {
	            key.set(pos);
	            int newSize = lineReader.readLine(value, maxLineLength,
	                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
	                            maxLineLength));
	            // 把字符串中的"##"转变为"#"
	            String strReplace = value.toString().replaceAll("\s+", "01");
	            Text txtReplace = new Text();
	            txtReplace.set(strReplace);
	            value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
	            if (newSize == 0)
	                return false;
	            pos += newSize;
	            if (newSize < maxLineLength)
	                return true;
	        }
	        return false;
	    }
	}

3、处理实例:如下

     

数据处理要求:
    
    12 afd   fewf	fewfe  we
    76 vee   ppt	wfew  wefw
    83 tyutr   ppt	wfew  wefw
    45 vbe   ppt	wfew  wefw
    565 wee   ppt	wfew  wefw
    12 sde   ppt	wfew  wefw
注意:字段之间的空格不一致

1、建表:
    create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
注意:输出咱可没有重写哦

2、加载数据:
    LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog;

3、看看的成果:
    select * from micmiu_blog;

自己去试试,不解释

  

原文地址:https://www.cnblogs.com/hiter-java/p/4820773.html