MapReduce:输入是两个文件,file1代表工厂表,包含工厂名列和地址编号列;file2代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名----地址名"表

文件如下:

file1:

Beijing Red Star    1
Shenzhen Thunder    3
Guangzhou Honda    2
Beijing Rising    1
Guangzhou Development Bank    2
Tencent    3
Back of Beijing    1

file2:

1    Beijing
2    Guangzhou
3    Shenzhen
4    Xian

代码如下(由于水平有限,不保证完全正确,如果发现错误欢迎指正):

package com;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Test {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration config = new  Configuration();
        config.set("fs.defaultFS", "hdfs://192.168.0.100:9000");
        config.set("yarn.resourcemanager.hostname", "192.168.0.100");
        
        FileSystem fs = FileSystem.get(config);
        
        Job job = Job.getInstance(config);
        
        job.setJarByClass(Test.class);
        
        //设置所用到的map类
        job.setMapperClass(myMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        //设置用到的reducer类
        job.setReducerClass(myReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        //设置s输入输出地址
        FileInputFormat.addInputPath(job, new Path("/FactoryName/"));
        
        Path path = new Path("/output2/");
        
        if(fs.exists(path)){
            fs.delete(path, true);
        }
        
        //指定文件的输出地址
        FileOutputFormat.setOutputPath(job, path);
        
        //启动处理任务job
        boolean completion = job.waitForCompletion(true);
        if(completion){
            System.out.println("Job Success!");
        }
    }
    
    public static class myMapper extends Mapper<Object, Text, Text, Text> {

        // 实现map函数
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String temp=new String();// 左右表标识
            String values=value.toString();
            String words[]=values.split("	");
            
            String mapkey = new String();
            String mapvalue = new String(); 
           
            //右表:1  Beijing
            if (words[0].charAt(0) >= '0' && words[0].charAt(0) <= '4') {
                mapkey = words[0];
                mapvalue =words[1];
                temp = "2";
                
            }else{
                //左表:Beijing Red Star  1
                mapkey = words[1];
                mapvalue =words[0];
                temp = "1";
            }
            
            // 输出左右表
            //左表:(1,1+Beijing Red Star)
            //右表:(1,2+Beijing)
            context.write(new Text(mapkey), new Text(temp + "+"+ mapvalue));
        }
    }

    //reduce解析map输出,将value中数据按照左右表分别保存
    public static class myReducer extends Reducer<Text, Text, Text, Text> {
        // 实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            
            List<String> factory = new ArrayList<String>();
            List<String> address = new ArrayList<String>();
            
            for (Text value : values) {
                // 取得左右表标识
                char temp=(char) value.charAt(0);
                String words[]=value.toString().split("[+]");//1,Beijing Red Star
                
                if(temp=='1'){
                    factory.add(words[1]);// 左表
                }
                
                if(temp=='2'){
                    address.add(words[1]);// 右表
                }
            }
            
            //求出笛卡尔积,并输出
            for (String f : factory) {
                for (String a : address) {
                    context.write(new Text(f), new Text(a));
                }
            }
        }
    } 
}

 输出结果如下:

如果您认为这篇文章还不错或者有所收获,您可以通过右边的“打赏”功能 打赏我一杯咖啡【物质支持】,也可以点击下方的【好文要顶】按钮【精神支持】,因为这两种支持都是使我继续写作、分享的最大动力!

原文地址:https://www.cnblogs.com/supiaopiao/p/7268756.html