1126每日博客

Mapreduce实例——Map端join

MapReduce提供了表连接操作其中包括Map端join、Reduce端join还有单表连接,现在我们要讨论的是Map端join,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。

1.Map端join的使用场景:一张表数据十分小、一张表数据很大。

Map端join是针对以上场景进行的优化:将小表中的数据全部加载到内存,按关键字建立索引。大表中的数据作为map的输入,对map()函数每一对<key,value>输入,都能够方便地和已加载到内存的小数据进行连接。把连接结果按key输出,经过shuffle阶段,reduce端得到的就是已经按key分组并且连接好了的数据。

为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:

(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。

(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

2.本实验Map端Join的执行流程

(1)首先在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join连接的 <key ,value>键值对,将其解释分割放到内存中(可以放大Hash Map等等容器中)。

(2)要重写MyMapper类下面的setup()方法,因为这个方法是先于map方法执行的,将较小表先读入到一个HashMap中。

(3)重写map函数,一行行读入大表的内容,逐一的与HashMap中的内容进行比较,若Key相同,则对数据进行格式化处理,然后直接输出。

(4)map函数输出的<key,value >键值对首先经过一个suffle把key值相同的所有value放到一个迭代器中形成values,然后将<key,values>键值对传递给reduce函数,reduce函数输入的key直接复制给输出的key,输入的values通过增强版for循环遍历逐一输出,循环的次数决定了<key,value>输出的次数。

1)Map端读取所有的文件,并在输出的内容里加上标识,代表数据是从哪个文件里来的。

(2)在reduce处理函数中,按照标识对数据进行处理。

(3)然后将相同key值进行join连接操作,求出结果并直接输出。

Mapreduce中join连接分为Map端Join与Reduce端Join,这里是一个Reduce端Join连接。程序主要包括两部分:Map部分和Reduce部分。

Map处理的是一个纯文本文件,Mapper处理的数据是由InputFormat将数据集切分成小的数据集InputSplit,并用RecordReader解析成<key,value>对提供给map函数使用。在map函数中,首先用getPath()方法获取分片InputSplit的路径并赋值给filePath,if判断filePath中如果包含goods.txt文件名,则将map函数输入的value值通过Split("\t")方法进行切分,与goods_visit文件里相同的商品id字段作为key,其他字段前加"1+"作为value。如果if判断filePath包含goods_visit.txt文件名,步骤与上面相同,只是把其他字段前加"2+"作为value。最后把<key,value>通过Context的write方法输出。

map函数输出的<key,value>经过shuffle将key相同的所有value放到一个迭代器中形成values,然后将<key,values>键值对传递给reduce函数。reduce函数中,首先新建两个Vector集合,用于存放输入的values中以"1+"开头和"2+"开头的数据。然后用增强版for循环遍历并嵌套if判断,若判断values里的元素以1+开头,则通过substring(2)方法切分元素,结果存放到left集合中,若values里元素以2+开头,则仍利用substring(2)方法切分元素,结果存放到right集合中。最后再用两个嵌套for循环,遍历输出<key,value>,其中输入的key直接赋值给输出的key,输出的value为left +"\t"+right。

代码如下:

package exper;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapJoin {

    public static class MyMapper extends Mapper<Object, Text, Text, Text> {
        private Map<String, String> dict = new HashMap<>();

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            String fileName = context.getLocalCacheFiles()[0].getName();
            //System.out.println(fileName);
           
BufferedReader reader = new BufferedReader(new FileReader(fileName));
            String codeandname = null;
            while (null != (codeandname = reader.readLine())) {
                String str[] = codeandname.split("   ");
                dict.put(str[0], str[2] + "   " + str[3]);
            }
            reader.close();
        }

        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] kv = value.toString().split("\t");
            if (dict.containsKey(kv[1])) {
                context.write(new Text(kv[1]), new Text(dict.get(kv[1]) + "\t" + kv[2]));
            }
        }
    }

    public static class MyReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text text : values) {
                context.write(key, text);
            }
        }
    }

    public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
        Job job = Job.getInstance();
        job.setJobName("mapjoin");
        job.setJarByClass(MapJoin.class);

        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        Path in = new Path("hdfs://hadoop102:9870/mymapreduce/5in/order_items1");
        Path out = new Path("hdfs://hadoop102:9870/mymapreduce/5out");
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);
        URI uri = new URI("hdfs://hadoop102:9870/mymapreduce/5in/orders1");
        job.addCacheFile(uri);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

原文地址:https://www.cnblogs.com/ruangongwangxiansheng/p/14568488.html