MAPREDUCE的实战案例

reducejoin算法实现

1、需求:

订单数据表t_order

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

商品信息表t_product

id

pname

category_id

price

P0001

小米5

1000

2

P0002

锤子T1

1000

3

假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算: 

select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

2、实现机制:

通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联

public class OrderJoin {

    static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // 拿到一行数据,并且要分辨出这行数据所属的文件
            String line = value.toString();

            String[] fields = line.split("	");

            // 拿到itemid
            String itemid = fields[0];

            // 获取到这一行所在的文件名(通过inpusplit)
            String name = "你拿到的文件名";

            // 根据文件名,切分出各字段(如果是a,切分出两个字段,如果是b,切分出3个字段)

            OrderJoinBean bean = new OrderJoinBean();
            bean.set(null, null, null, null, null);
            context.write(new Text(itemid), bean);

        }

    }

    static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {

        @Override
        protected void reduce(Text key, Iterable<OrderJoinBean> beans, Context context) throws IOException, InterruptedException {
            
             //拿到的key是某一个itemid,比如1000
            //拿到的beans是来自于两类文件的bean
            //  {1000,amount} {1000,amount} {1000,amount}   ---   {1000,price,name}
            
            //将来自于b文件的bean里面的字段,跟来自于a的所有bean进行字段拼接并输出
        }
    }
}

缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜

解决方案: mapjoin实现方式

1、原理阐述

适用于关联表中有小表的情形;

可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度

2、实现示例

--先在mapper类中预先定义好小表,进行join

--引入实际场景中的解决方案:一次加载数据库或者用distributedcache

public class TestDistributedCache {
    static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{
        FileReader in = null;
        BufferedReader reader = null;
        HashMap<String,String> b_tab = new HashMap<String, String>();
        String localpath =null;
        String uirpath = null;
        
        //是在map任务初始化的时候调用一次
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用
            Path[] files = context.getLocalCacheFiles();
            localpath = files[0].toString();
            URI[] cacheFiles = context.getCacheFiles();
            
            
            //缓存文件的用法——直接用本地IO来读取
            //这里读的数据是map task所在机器本地工作目录中的一个小文件
            in = new FileReader("b.txt");
            reader =new BufferedReader(in);
            String line =null;
            while(null!=(line=reader.readLine())){
                
                String[] fields = line.split(",");
                b_tab.put(fields[0],fields[1]);
                
            }
            IOUtils.closeStream(reader);
            IOUtils.closeStream(in);
            
        }
        
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            //这里读的是这个map task所负责的那一个切片数据(在hdfs上)
             String[] fields = value.toString().split("	");
             
             String a_itemid = fields[0];
             String a_amount = fields[1];
             
             String b_name = b_tab.get(a_itemid);
             
             // 输出结果  1001    98.9    banan
             context.write(new Text(a_itemid), new Text(a_amount + "	" + ":" + localpath + "	" +b_name ));
             
        }
        
        
    }
    
    
    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(TestDistributedCache.class);
        
        job.setMapperClass(TestDistributedCacheMapper.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        
        //这里是我们正常的需要处理的数据所在路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //不需要reducer
        job.setNumReduceTasks(0);
        //分发一个文件到task进程的工作目录
        job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));
        
        //分发一个归档文件到task进程的工作目录
//        job.addArchiveToClassPath(archive);

        //分发jar包到task节点的classpath下
//        job.addFileToClassPath(jarfile);
        
        job.waitForCompletion(true);
    }
}

web日志预处理

1、需求:

web访问日志中的各字段识别切分

去除日志中不合法的记录

根据KPI统计需求,生成各类访问请求过滤数据

2、实现代码:

a) 定义一个bean,用来记录日志数据中的各数据字段

public class WebLogBean {
    
    private String remote_addr;// 记录客户端的ip地址
    private String remote_user;// 记录客户端用户名称,忽略属性"-"
    private String time_local;// 记录访问时间与时区
    private String request;// 记录请求的url与http协议
    private String status;// 记录请求状态;成功是200
    private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
    private String http_referer;// 用来记录从那个页面链接访问过来的
    private String http_user_agent;// 记录客户浏览器的相关信息

    private boolean valid = true;// 判断数据是否合法

    
    
    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getRemote_user() {
        return remote_user;
    }

    public void setRemote_user(String remote_user) {
        this.remote_user = remote_user;
    }

    public String getTime_local() {
        return time_local;
    }

    public void setTime_local(String time_local) {
        this.time_local = time_local;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getBody_bytes_sent() {
        return body_bytes_sent;
    }

    public void setBody_bytes_sent(String body_bytes_sent) {
        this.body_bytes_sent = body_bytes_sent;
    }

    public String getHttp_referer() {
        return http_referer;
    }

    public void setHttp_referer(String http_referer) {
        this.http_referer = http_referer;
    }

    public String getHttp_user_agent() {
        return http_user_agent;
    }

    public void setHttp_user_agent(String http_user_agent) {
        this.http_user_agent = http_user_agent;
    }

    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }
    
    
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.valid);
        sb.append("01").append(this.remote_addr);
        sb.append("01").append(this.remote_user);
        sb.append("01").append(this.time_local);
        sb.append("01").append(this.request);
        sb.append("01").append(this.status);
        sb.append("01").append(this.body_bytes_sent);
        sb.append("01").append(this.http_referer);
        sb.append("01").append(this.http_user_agent);
        return sb.toString();
}
}

b)定义一个parser用来解析过滤web访问日志原始记录

public class WebLogParser {
    public static WebLogBean parser(String line) {
        WebLogBean webLogBean = new WebLogBean();
        String[] arr = line.split(" ");
        if (arr.length > 11) {
            webLogBean.setRemote_addr(arr[0]);
            webLogBean.setRemote_user(arr[1]);
            webLogBean.setTime_local(arr[3].substring(1));
            webLogBean.setRequest(arr[6]);
            webLogBean.setStatus(arr[8]);
            webLogBean.setBody_bytes_sent(arr[9]);
            webLogBean.setHttp_referer(arr[10]);
            
            if (arr.length > 12) {
                webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);
            } else {
                webLogBean.setHttp_user_agent(arr[11]);
            }
            if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
                webLogBean.setValid(false);
            }
        } else {
            webLogBean.setValid(false);
        }
        return webLogBean;
    }
   
    public static String parserTime(String time) {
        
        time.replace("/", "-");
        return time;
        
    }
}

c) mapreduce程序

public class WeblogPreProcess {

    static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        Text k = new Text();
        NullWritable v = NullWritable.get();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            WebLogBean webLogBean = WebLogParser.parser(line);
            if (!webLogBean.isValid())
                return;
            k.set(webLogBean.toString());
            context.write(k, v);

        }

    }

    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(WeblogPreProcess.class);
        
        job.setMapperClass(WeblogPreProcessMapper.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.waitForCompletion(true);
        
    }
}

流量统计相关需求

1、对流量日志中的用户统计总上、下行流量技术点: 自定义javaBean用来在mapreduce中充当value

注意: javaBean要实现Writable接口,实现两个方法

    //序列化,将对象的字段信息写入输出流
    @Override
    public void write(DataOutput out) throws IOException {
        
        out.writeLong(upflow);
        out.writeLong(downflow);
        out.writeLong(sumflow);
        
    }

    //反序列化,从输入流中读取各个字段信息
    @Override
    public void readFields(DataInput in) throws IOException {
        upflow = in.readLong();
        downflow = in.readLong();
        sumflow = in.readLong();
        
    }

1、统计流量且按照流量大小倒序排序

技术点:这种需求,用一个mapreduce -job 不好实现,需要两个mapreduce -job

第一个job负责流量统计,跟上题相同

第二个job读入第一个job的输出,然后做排序

要将flowBean作为mapkey输出,这样mapreduce就会自动排序 此时,flowBean要实现接口WritableComparable   要实现其中的compareTo()方法,方法中,我们可以定义倒序比较的逻辑

1、统计流量且按照手机号的归属地,将结果数据输出到不同的省份文件中技术点:自定义Partitioner

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        
        String prefix = key.toString().substring(0,3);
        Integer partNum = pmap.get(prefix);
        
        return (partNum==null?4:partNum);
    }

自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task

job.setNumReduceTasks(5);

注意:如果reduceTask的数量>= getPartition的结果数  ,则会多产生几个空的输出文件part-r-000xx

如果     1<reduceTask的数量<getPartition的结果数 ,则有一部分分区数据无处安放,会Exception!!!

如果 reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000

社交粉丝数据分析

以下是qq的好友列表数据,冒号前是一个用,冒号后是该用户的所有好友(数据中的好友关系是单向的)

A:B,C,D,F,E,O

B:A,C,E,K

C:F,A,D,I

D:A,E,F,L

E:B,C,D,M,L

F:A,B,C,D,E,O,M

G:A,C,D,E,F

H:A,C,D,E,O

I:A,O

J:B,O

K:A,C,D

L:D,E,F

M:E,F,G

O:A,H,I,J

求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

解题思路:

求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

解题思路:

第一步  

map

读一行   A:B,C,D,F,E,O

输出    <B,A><C,A><D,A><F,A><E,A><O,A>

在读一行   B:A,C,E,K

输出   <A,B><C,B><E,B><K,B>

REDUCE

拿到的数据比如<C,A><C,B><C,E><C,F><C,G>......

输出:  

<A-B,C>

<A-E,C>

<A-F,C>

<A-G,C>

<B-E,C>

<B-F,C>.....

第二步

map

读入一行<A-B,C>

直接输出<A-B,C>

reduce

读入数据  <A-B,C><A-B,F><A-B,G>.......

输出: A-B  C,F,G,.....

package cn.itcast.bigdata.mr.fensi;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SharedFriendsStepOne {

    static class SharedFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // A:B,C,D,F,E,O
            String line = value.toString();
            String[] person_friends = line.split(":");
            String person = person_friends[0];
            String friends = person_friends[1];

            for (String friend : friends.split(",")) {

                // 输出<好友,人>
                context.write(new Text(friend), new Text(person));
            }

        }

    }

    static class SharedFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text friend, Iterable<Text> persons, Context context) throws IOException, InterruptedException {

            StringBuffer sb = new StringBuffer();

            for (Text person : persons) {
                sb.append(person).append(",");

            }
            context.write(friend, new Text(sb.toString()));
        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        job.setJarByClass(SharedFriendsStepOne.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        job.setMapperClass(SharedFriendsStepOneMapper.class);
        job.setReducerClass(SharedFriendsStepOneReducer.class);

        FileInputFormat.setInputPaths(job, new Path("D:/srcdata/friends"));
        FileOutputFormat.setOutputPath(job, new Path("D:/temp/out"));

        job.waitForCompletion(true);

    }

}
package cn.itcast.bigdata.mr.fensi;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SharedFriendsStepTwo {

    static class SharedFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {

        // 拿到的数据是上一个步骤的输出结果
        // A I,K,C,B,G,F,H,O,D,
        // 友 人,人,人
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String[] friend_persons = line.split("	");

            String friend = friend_persons[0];
            String[] persons = friend_persons[1].split(",");

            Arrays.sort(persons);

            for (int i = 0; i < persons.length - 1; i++) {
                for (int j = i + 1; j < persons.length; j++) {
                    // 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去
                    context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));
                }

            }

        }

    }

    static class SharedFriendsStepTwoReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text person_person, Iterable<Text> friends, Context context) throws IOException, InterruptedException {

            StringBuffer sb = new StringBuffer();

            for (Text friend : friends) {
                sb.append(friend).append(" ");

            }
            context.write(person_person, new Text(sb.toString()));
        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        job.setJarByClass(SharedFriendsStepTwo.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(SharedFriendsStepTwoMapper.class);
        job.setReducerClass(SharedFriendsStepTwoReducer.class);

        FileInputFormat.setInputPaths(job, new Path("D:/temp/out/part-r-00000"));
        FileOutputFormat.setOutputPath(job, new Path("D:/temp/out2"));

        job.waitForCompletion(true);

    }

}

 

原文地址:https://www.cnblogs.com/duan2/p/7538049.html