hadoop MapReduce在Linux上运行的一些命令

看的16年的学习视频,却忽略了这些年的技术更新,有些命令也随之发生了变化,在这个上边吃了大亏,特此做记录。

想要运行MapReduce程序,首先需要用javaApi先写一些脚本代码:

首先需要的是Mapper类与Reducer类,在此我将两个类以及main函数都写在一个类里,需要读取的文件为手机流量例子。

public class FlowCount {
/*
 * Mapper
  * */
    static class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //将一行内容转成string
            String line = value.toString();
            //切分字段
            String[] fields = line.split("	");
            //取出手机号
            String phoneNum = fields[1];
            //取出上行流量下行流量
            long upFlow = Long.parseLong(fields[fields.length-3]);
            long downFlow = Long.parseLong(fields[fields.length-2]);

            context.write(new Text(phoneNum),new FlowBean(upFlow,downFlow));

        }


    }

    /*
    Reducer
     */
    static class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean>{

        //<183323,bean1><183323,bean2><183323,bean3><183323,bean4>
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

            long sum_upFlow = 0;
            long sum_dFlow = 0;

            //遍历所有的Bean,将其中的上行流量,下行流量分别累加
            for (FlowBean bean:values){
                sum_upFlow += bean.getUpFlow();
                sum_dFlow += bean.getDownFlow();
            }

            FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);
            context.write(key,resultBean);

        }
    }



    public static void main(String[] args)throws Exception{
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://min1:9000/");
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "min1");
        conf.set("yarn.resourcemanager.address", "min1"+":"+8032);
        conf.set("yarn.resourcemanager.scheduler.address", "min1"+":"+8030);
        //运行集群模式,就是把程序提交到yarn中去运行
        //要想运行为集群模式,以下3个参数要指定为集群上的值
        /*conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "mini1");
        conf.set("fs.defaultFS", "hdfs://mini1:9000/");*/
        Job job = Job.getInstance(conf,"wordcount");

//        job.setJar("c:/wc.jar");
        //指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowCount.class);

        //指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        //指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path("/flowsum/input"));
        //指定job的输出结果所在目录
        FileOutputFormat.setOutputPath(job, new Path("/flowsum/output"));

        //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行

        /*job.submit();*/
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }

}

我们将读取的数据进行封装,封装成一个FlowBean类

package mrFlowSum;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    //反序列化时需要反射调用空参构造函数,所以要显式定义一个
    public FlowBean() {
    }


    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = downFlow+upFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    /*
    序列化方法
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }
    /*
        反序列化方法
        注意:反序列化的顺序跟序列化的顺序完全一致
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {

        long upFlow = dataInput.readLong();
        long downFlow = dataInput.readLong();
        long sumFlow = dataInput.readLong();

    }

    @Override
    public String toString() {
        return upFlow + "	" + downFlow + "	" + sumFlow;
    }
}

Flow.data例子:

1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157995052     13826544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4            4    0    264    0    200
1363157991076     13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
1363154400022     13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
1363157993044     18211575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200
1363157995074     84138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
1363157993055     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
1363157995033     15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    3156    2936    200
1363157983019     13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
1363157984041     13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    6960    690    200
1363157973098     15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    3659    3538    200
1363157986029     15989002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    1938    180    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157984040     13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    1938    2910    200
1363157995093     13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
1363157982040     13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    7335    110349    200
1363157986072     18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    2412    200
1363157990043     13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    48243    200
1363157988072     13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
1363157985066     13726238888    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157993055     13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200

在IDEA上进行jar包打包,上传至Linux服务器,将例子文件(flow.data)也上传至服务器,而后使用

hadoop fs -put flow.data /flowsum/input      这条命令将文件放入HDFS的/flowsum/input  输入文件夹内

使用命令hadoop jar mapreduce.jar  /flowsum/input  /flowsum/output2  运行jar包运行程序。    (在这里栽了很大的跟头,之前跟着学习视频使用命令 hadoop jar mapreduce.jar   mrFlowSum.FlowCount  /flowsum/input  /flowsum/output2,里边多了一个主类函数名称,而我的主类名称早就在pom中定义好了,所以无需加这个主类名称)

运行成功后会在HDFS中生成一个output文件夹,文件夹中生成_SUCCESS文件以及part-r-0000xx文件,后者即为我们想要的最终结果,

我们可以使用命令  

hadoop fs -cat /flowsum/output/part-r-0000xx查看最终结果

原文地址:https://www.cnblogs.com/fjlcoding/p/10298856.html