寒假学习(四)编写MapReduce程序清洗信件内容数据

对爬取到的数据进行清洗,按照一定的规则把“脏数据”“洗掉”。

数据清洗是对数据进行重新审查和校验的过程,目的在于删除重复信息、纠正存在的错误,并提供数据一致性。

import java.io.BufferedReader;  
import java.io.InputStreamReader;  
  
import java.io.IOException;  
  
import org.apache.hadoop.fs.FSDataInputStream;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.JobContext;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  
public class FileRecordReader extends RecordReader<text,text>{  
  
    private FileSplit fileSplit;  
    private JobContext jobContext;  
    private Text currentKey = new Text();  
    private Text currentValue = new Text();  
    private boolean finishConverting = false;  
    @Override  
    public void close() throws IOException {  
  
  
    @Override  
    public Text getCurrentKey() throws IOException, InterruptedException {  
        return currentKey;  
    }  
  
    @Override  
    public Text getCurrentValue() throws IOException,  
            InterruptedException {  
        return currentValue;  
    }  
  
    @Override  
    public float getProgress() throws IOException, InterruptedException {  
        float progress = 0;  
        if(finishConverting){  
            progress = 1;  
        }  
        return progress;  
    }  
  
    @Override  
    public void initialize(InputSplit arg0, TaskAttemptContext arg1)  
            throws IOException, InterruptedException {  
        this.fileSplit = (FileSplit) arg0;  
        this.jobContext = arg1;  
        String filename = fileSplit.getPath().getName();  
        this.currentKey = new Text(filename);  
    }  
  
    @Override  
    public boolean nextKeyValue() throws IOException, InterruptedException {  
        if(!finishConverting){  
            int len = (int)fileSplit.getLength();  
//          byte[] content = new byte[len];  
            Path file = fileSplit.getPath();  
            FileSystem fs = file.getFileSystem(jobContext.getConfiguration());  
            FSDataInputStream in = fs.open(file);  
//根据实际网页的编码格式修改  
 //         BufferedReader br = new BufferedReader(new InputStreamReader(in,"gbk"));  
            BufferedReader br = new BufferedReader(new InputStreamReader(in,"utf-8"));  
            String line="";  
            String total="";  
            while((line= br.readLine())!= null){  
                total =total+line+"
";  
            }  
            br.close();  
            in.close();  
            fs.close();  
            currentValue new Text(total);  
            finishConverting true;  
            return true;  
        }  
        return false;  
    }  
  
}  

 完整代码:

import java.io.IOException;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import cn.wanghaomiao.xpath.exception.XpathSyntaxErrorException;  
import cn.wanghaomiao.xpath.model.JXDocument;  
  
public class QingxiHtml {  
    public static class doMapper extends Mapper<object, text,="" text=""> {  
        public static final IntWritable one = new IntWritable(1);  
        public static Text word = new Text();  
  
        @Override  
        protected void map(Object key, Text value, Context context)  
                throws IOException, InterruptedException {  
            String htmlStr = value.toString();  
            JXDocument Document new JXDocument(htmlStr);  
            if (htmlStr.indexOf("mail_track_h2") > 0) {  
                try {  
                    String leixing = Document  
                            .sel("//span[@class='font12 gray']/a[2]/text()")  
                            .get(0).toString();  
                    String biaoti = Document  
                            .sel("//h2[@class='mail_track_h2']/text()").get(0)  
                            .toString();  
                    String leixinren = Document  
                            .sel("//p[@class='font12 gray time_mail']/span[1]/text()")  
                            .get(0).toString().replaceAll("来信人:", "");  
                    String shijian = Document  
                            .sel("//p[@class='font12 gray time_mail']/span[2]/text()")  
                            .get(0).toString().replaceAll("时间:", "");  
                    String number = Document  
                            .sel("//p[@class='font12 gray time_mail']/span[3]/allText()")  
                            .get(0).toString().replace("网友同问: ", "").replace("网友评价数: ", "");  
                    String problem = Document  
                            .sel("//span[@class='font14 mail_problem']/text()")  
                            .get(0).toString();  
                    if (htmlStr.indexOf("margin-bottom:31px") > 0) {  
                    String offic = Document  
                                .sel("//div[@class='con_left float_left']/div[2]/span[1]/text()")  
                                .get(0).toString();  
                    String officpt = Document  
                                .sel("//div[@class='con_left float_left']/div[2]/span[2]/text()")  
                                .get(0).toString();  
  
                        String officp = Document  
                                .sel("//div[@class='con_left float_left']/div[2]/p[1]/text()")  
                                .get(0).toString();  
                    String dataout = leixing + "	" + biaoti + "	"  
                                + leixinren + "	" + shijian + "	" + number  
                                + "	" + problem + "	" + offic + "	"  
                                + officpt + "	"+ officp;  
                        System.out.println(dataout);  
                        Text oneLines new Text(dataout);  
                        context.write(oneLines, new Text(""));  
                } else {  
                        String dataout = leixing + "	" + biaoti + "	"  
                                + leixinren + "	" + shijian + "	" + number  
                                + "	" + problem;  
                        System.out.println(dataout);  
                        Text oneLines new Text(dataout);  
                        context.write(oneLines, new Text(""));  
                    }  
  
                } catch (XpathSyntaxErrorException e) {  
                    // TODO Auto-generated catch block  
                    e.printStackTrace();  
                }  
            }  
        }  
    }  
  
    public static void main(String[] args) throws IOException,  
            ClassNotFoundException, InterruptedException {  
        Job job = Job.getInstance();  
        job.setJobName("QingxiHtml");  
        job.setJarByClass(QingxiHtml.class);  
        job.setMapperClass(doMapper.class);  
  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
        job.setInputFormatClass(FileInput.class);  
        Path in new Path("hdfs://localhost:9000//myedu2/in");  
        Path out new Path("hdfs://localhost:9000//myedu2/out/1");  
        FileInputFormat.addInputPath(job, in);  
        FileOutputFormat.setOutputPath(job, out);  
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }  
}  

 

原文地址:https://www.cnblogs.com/sonofdemon/p/12253401.html