复合式MapReduce之ChainJob

依赖式的场景

我们可以设想一下MapReduce有2个子任务job1,job2构成,job2要在job1完成之后才执行。

job1:用上篇写的合并小文件

job2:使用单词计数

这种关系就叫复杂数据依赖关系的组合时mapreduce。

hadoop为这种组合关系提供了一种执行和控制机制,hadoop通过job和jobControl类提供具体的编程方法。

Job除了维护子任务的配置信息,还维护子任务的依赖关系。 而jobControl控制整个作业流程,把所有的子任务作业加入到JobControl中,执行JobControl的run()方法即可运行程序

一、编写WcDMergeChainJob:实现复合式MapReduce

package cn.itxiaobai;

import com.google.common.io.Resources;
import com.utils.CDUPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.List;

public class WcDMergeChainJob {
    public static void main(String[] args) throws IOException, InterruptedException {
        //创建配置
        Configuration configuration = new Configuration();
        //加载配置文件
        configuration.addResource(Resources.getResource("core-site-local.xml"));
        //合并小文件
        Job job1 = setJob1();
        //单词计数
        Job job2 = setJob2();

        ControlledJob controlledJob1 = new ControlledJob(configuration);
        controlledJob1.setJob(job1);

        ControlledJob controlledJob2 = new ControlledJob(configuration);
        controlledJob2.setJob(job2);

        //设置依赖关系
        controlledJob2.addDependingJob(controlledJob1);

        JobControl jobControl = new JobControl("word count depend merge small file");

        jobControl.addJob(controlledJob1);
        jobControl.addJob(controlledJob2);
        //开启线程执行jobControl
        new Thread(jobControl).start();

        //打印正在执行线程的详情
        while (true){
            List<ControlledJob> jobList = jobControl.getRunningJobList();
            System.out.println(jobList);
            Thread.sleep(5000);
        }
    }

    private static Job setJob2() throws IOException {
        //创建配置
        Configuration configuration = new Configuration();
        //加载配置文件
        configuration.addResource(Resources.getResource("core-site-local.xml"));
        //设定一个任务
        Job job2 = Job.getInstance(configuration, "my word count local");
        //设置执行的job主类
        job2.setJarByClass(WCJob.WCJober.class);
        //设置Map的加载类以及输出类型
        job2.setMapperClass(WCJob.WCMapper.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(IntWritable.class);
        //设置Reduce的加载类以及job/reduce的输出类型
        job2.setReducerClass(WCJob.WCReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(IntWritable.class);

        //创建任务输入与输出路径
        FileInputFormat.addInputPath(job2,new Path("/mymergeout"));
        //在输出路径之前判断一下是否存在,如果存在删除
        CDUPUtils.deleteFileName("/mywcout");
        FileOutputFormat.setOutputPath(job2,new Path("/mywcout"));
        return job2;
    }

    private static Job setJob1() throws IOException {
        Configuration coreSiteConf = new Configuration();
        coreSiteConf.addResource(Resources.getResource("core-site-local.xml"));
        //设置一个任务
        Job job1 = Job.getInstance(coreSiteConf, "my small merge big file");
        //设置job的运行类
        job1.setJarByClass(MergeSmallFileJob.MyJob.class);

        //设置Map和Reduce处理类
        job1.setMapperClass(MergeSmallFileJob.MergeSmallFileMapper.class);
        job1.setReducerClass(MergeSmallFileJob.MergeSmallFileReduce.class);

        //map输出类型
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(Text.class);
        //设置job/reduce输出类型
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(Text.class);

        FileSystem fileSystem = FileSystem.get(coreSiteConf);
        //listFiles:可以迭代便利文件
        RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true);
        while (listFiles.hasNext()) {
            LocatedFileStatus fileStatus = listFiles.next();
            Path filesPath = fileStatus.getPath();
            if (!fileStatus.isDirectory()) {
                //判断大小 及格式
                if (fileStatus.getLen() < 2 * 1014 * 1024 && filesPath.getName().contains(".txt")) {
                    //文件输入路径
                    FileInputFormat.addInputPath(job1,filesPath);
                }
            }
        }

        //删除存在目录
        CDUPUtils.deleteFileName("/mymergeout");

        FileOutputFormat.setOutputPath(job1, new Path("/mymergeout"));
        return job1;
    }
}

二、里面用到自己写的工具类CDUPUtils :用于删除已存在目录以及阅读文件内容


package com.utils;
 
import com.google.common.io.Resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.util.ArrayList;
 
public class CDUPUtils {
    //删除已经存在在hdfs上面的文件文件
    public static void deleteFileName(String path) throws IOException {
        //将要删除的文件
        Path fileName = new Path(path);
        Configuration entries = new Configuration();
        //解析core-site-master2.xml文件
        entries.addResource(Resources.getResource("core-site-local.xml"));
        //coreSiteConf.set(,);--在这里可以添加配置文件
        FileSystem fileSystem = FileSystem.get(entries);
        if (fileSystem.exists(fileName)){
            System.out.println(fileName+"已经存在,正在删除它...");
            boolean flag = fileSystem.delete(fileName, true);
            if (flag){
                System.out.println(fileName+"删除成功");
            }else {
                System.out.println(fileName+"删除失败!");
                return;
            }
        }
        //关闭资源
        fileSystem.close();
    }
 
    //读取文件内容
    public static void readContent(String path) throws IOException {
        //将要读取的文件路径
        Path fileName = new Path(path);
        ArrayList<String> returnValue = new ArrayList<String>();
        Configuration configuration = new Configuration();
        configuration.addResource(Resources.getResource("core-site-local.xml"));
        //获取客户端系统文件
        FileSystem fileSystem = FileSystem.get(configuration);
        //open打开文件--获取文件的输入流用于读取数据
        FSDataInputStream inputStream = fileSystem.open(fileName);
        InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
        //一行一行的读取数据
        LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader);
        //定义一个字符串变量用于接收每一行的数据
        String str = null;
        //判断何时没有数据
        while ((str=lineNumberReader.readLine())!=null){
            returnValue.add(str);
        }
        //打印数据到控制台
        System.out.println("MapReduce算法操作的文件内容如下:");
        for (String read :
                returnValue) {
            System.out.println(read);
        }
        //关闭资源
        lineNumberReader.close();
        inputStream.close();
        inputStreamReader.close();
    }
}

三、配置文件:cort-site-local.xml


<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://master2:9000</value>
    </property>
    <property>
        <name>fs.hdfs.impl</name>
        <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
    </property>
</configuration>

四、pom中添加的依赖 


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.zhiyou100</groupId>
    <artifactId>mrdemo</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <properties>
        <org.apache.hadoop.version>2.7.5</org.apache.hadoop.version>
    </properties>
 
    <!--分布式计算-->
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${org.apache.hadoop.version}</version>
        </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>${org.apache.hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
                <version>${org.apache.hadoop.version}</version>
            </dependency>
 
        <!--分布式存储-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${org.apache.hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${org.apache.hadoop.version}</version>
        </dependency>
        </dependencies>
</project>

五、在本地直接运行(右击Run)测试

表示正在运行job1

表示正在运行job2

在hdfs上查看,出现线面内容,表示运行成功!!!

hadoop fs -cat /mywcout/part-r-00000

原文地址:https://www.cnblogs.com/pigdata/p/10305599.html