map端join

package my.hadoop.hdfs.mapreduceJoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 /**
  * 当商品表比较小只有几十个(比如小米手机),但是订单表比较大(一年卖几千万)此时
  * 如果将每个产品用一个reduce处理时那就可能出现小米书包只有几万,数据,但是小米手机就有100万的数据,
  * 出现负载不均衡,数据倾斜的情况。
  * @author lq
  *
  */
public class MapsideJoin {
     
    public static class FindFriendMapper extends
            Mapper<LongWritable, Text, AllInfoBean, NullWritable> {
        
        FileSplit fileSplit = null;
        String filename = null;
    
        Map<String,String> pdinfo = new HashMap<String,String>();

        
        
        @Override
        protected void setup(
                Mapper<LongWritable, Text, AllInfoBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            //文件和程序已经在同一个路径(splist。xml。wc,)
        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product")));
        String line  = null;
        while ((line  = br.readLine())!=null){
            String[] split = line.split(",");
            pdinfo.put(split[0], split[1]);
        }
        // 关闭流
        br.close();
        }
        AllInfoBean bean = new AllInfoBean();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 获取文件名字的方法
            // 判断用的是哪个文件
                String[] cols = value.toString().split(",");
                bean.setOderid(Integer.parseInt(cols[0]));
                bean.setDate(cols[1]);
                bean.setPid(cols[2]);
                bean.setAmount(Integer.parseInt(cols[3]));
                bean.setPname(pdinfo.get(cols[2])==null? "" : pdinfo.get(cols[2]));
                bean.setPrice("");
                bean.setCategory_id("");
                
            
            
            context.write(bean, NullWritable.get());
        }
    }

 //不要reduce
    /*public static class FindFriendReducer extends
            Reducer<Text, AllInfoBean, AllInfoBean, NullWritable> {
         
    
        @Override
        protected void reduce(Text Keyin, Iterable<AllInfoBean> values,
                Context context) throws IOException, InterruptedException {
             
          
            for(AllInfoBean bean : values){
                context.write(bean, NullWritable.get());
            }
            
          
        }
    }*/

 
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException, URISyntaxException {
        
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(MapsideJoin.class);
        
        job.setMapperClass(FindFriendMapper.class);
        //不指定reduce
        //job.setReducerClass(FindFriendReducer.class);
        //指定最终输出的数据kv类型
        
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(AllInfoBean.class);
        job.setNumReduceTasks(0);//设置不运行reduce
        job.setOutputKeyClass(AllInfoBean.class);
        job.setOutputValueClass(NullWritable.class);
        //第三方jar包使用这个路径指定,本地和hdfs都可以
        //job.addArchiveToClassPath(archive);
        //job
        job.addCacheFile(new URI("hdfs://mini2:9000/Rjoin/dat2/product"));//缓存其他节点
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 :1);
    }

}
原文地址:https://www.cnblogs.com/rocky-AGE-24/p/6913131.html