HADOOP之HDFS使用idea练习MapReduce(八)

HDFS基本命令

  • 创建文件夹
    • hdfs dfs -mkdir -p  /data/dist1
  • 上传文件
    • hdfs dfs -put dist.txt /data/dist1
  • 查看目录下有哪些文件
    • hdfs dfs -ls -R /data/topn
  • 查看具体的文件
    • hdfs dfs -cat /data/topn/output3/part-r-00000

练习内容:将每个月的第一高气温和第二高气温删选出来,第一温度和第二温度不能是同一天。

有一组数据:其中空格部门是制表符 '	'。
数据格式: 日期、城市字段、温度 2019-6-1 22:22:22 1 39 2019-5-21 22:22:22 3 33 2019-6-1 22:22:22 1 38 2019-6-2 22:22:22 2 31 2018-3-11 22:22:22 3 18 2018-4-23 22:22:22 1 22 1970-8-23 22:22:22 2 23 1970-8-8 22:22:22 1 32 2019-6-1 22:22:22 1 39 2019-5-21 22:22:22 3 33 2019-6-1 22:22:22 1 44 2019-6-2 22:22:22 2 50 2018-3-11 22:22:22 3 18 2018-4-23 22:22:22 1 65 1970-8-23 22:22:22 2 66 1970-8-8 22:22:22 1 77

创建MyTopN类:


package com.xiaoke.mapreduce.topn;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MyTopN {

public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration(true);

//让框架知道是windows异构平台运行
configuration.set("mapreduce.app-submission.cross-platform", "true");

configuration.set("mapreduce.framework.name", "local");

// 点击job下面的参数根据example写
Job job = Job.getInstance(configuration);

job.setJarByClass(MyTopN.class);
//job.setJar("D:\code\mayun_hadoop\test\hadoop\target\hadoop-hdfs-1.0-SNAPSHOT.jar");

// Specify various job-specific parameters
job.setJobName("topN1");

Path inputPath = new Path("/data/topn/input1");
TextInputFormat.setInputPaths(job, inputPath);

Path outputPath = new Path("/data/topn/output2");
if (outputPath.getFileSystem(configuration).exists(outputPath))
outputPath.getFileSystem(configuration).delete(outputPath, true);
TextOutputFormat.setOutputPath(job, outputPath);

// map设置
job.setMapOutputKeyClass(TKey.class);
job.setMapperClass(TMapper.class);
job.setMapOutputValueClass(IntWritable.class);

//分区
job.setPartitionerClass(TPartitioner.class); //partitioner 按 年,月 分区 -》 分区 > 分组 按 年分区!!!!!! //分区器潜台词:满足 相同的key获得相同的分区号就可以~!
job.setSortComparatorClass(TSortComparator.class); //年,月,温度 且 温度倒序

// reduce设置
job.setGroupingComparatorClass(TGroupingComparator.class);//按年月分组
job.setReducerClass(TReducer.class);

job.waitForCompletion(true);

}
}
 

创建TKey.class

package com.xiaoke.mapreduce.topn;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

//自定义类型必须实现接口: 序列化/反序列化   比较器
public class TKey implements WritableComparable<TKey> {

    private int year;
    private int month;
    private int day;
    //温度
    private int wd;


    @Override
    public int compareTo(TKey that) {
        // 可写可不写,如果重写了排序方法,则需要写,这里就作废
        int year = Integer.compare(this.year, that.getYear());
        if (year == 0) {
            int month = Integer.compare(this.month, that.getMonth());
            if (month == 0) {
                return Integer.compare(this.day, that.getDay());
            }
            return month;
        }
        return year;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeInt(month);
        out.writeInt(day);
        out.writeInt(wd);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.year = in.readInt();
        this.month = in.readInt();
        this.day = in.readInt();
        this.wd = in.readInt();
    }


    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public int getDay() {
        return day;
    }

    public void setDay(int day) {
        this.day = day;
    }

    public int getWd() {
        return wd;
    }

    public void setWd(int wd) {
        this.wd = wd;
    }
}

创建TMapper.class

package com.xiaoke.mapreduce.topn;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class TMapper extends Mapper<LongWritable, Text, TKey, IntWritable> {

    TKey mkey = new TKey();
    IntWritable mval = new IntWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        //开发习惯:不要过于自信
        // value:  2019-6-1 22:22:22    1    31
        String[] split = StringUtils.split(value.toString(), '	');
        // 2019-6-1 22:22:22
        // 1
        // 31

        try {
            Date data = sdf.parse(split[0]);
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(data);
            mkey.setYear(calendar.get(Calendar.YEAR));
            mkey.setMonth(calendar.get(Calendar.MONTH) + 1);
            mkey.setDay(calendar.get(Calendar.DAY_OF_MONTH));

            int wd = Integer.parseInt(split[2]);
            mkey.setWd(wd);
            mval.set(wd);

            // TKey 是根据年月日排序
            // map这里将值拆成: 年月日温度  温度
            context.write(mkey, mval);
        } catch (ParseException e) {
            e.printStackTrace();
        }


    }
}

创建TPartitioner.class

package com.xiaoke.mapreduce.topn;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class TPartitioner extends Partitioner<TKey, IntWritable> {

    //1,不能太复杂。。。
    //partitioner  按  年,月  分区  -》  分区 > 分组  按 年分区!!!!!!
    //分区器潜台词:满足  相同的key获得相同的分区号就可以~!

    @Override
    public int getPartition(TKey tKey, IntWritable intWritable, int numPartitions) {
        //numPartitions 数量来自配置项 mapreduce.task.partition
        return tKey.getYear() % numPartitions;
    }
}

创建TSortComparator.class

package com.xiaoke.mapreduce.topn;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TSortComparator extends WritableComparator {

    // 需要构造 修改序列化方式
    public TSortComparator() {
        super(TKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        TKey k1 = (TKey) a;
        TKey k2 = (TKey) b;
        //  年,月,温度,,且温度倒序:
        int c1 = Integer.compare(k1.getYear(), k2.getYear());
        if (c1 == 0) {
            int c2 = Integer.compare(k1.getMonth(), k2.getMonth());
            if (c2 == 0) {
                return -Integer.compare(k1.getWd(), k2.getWd());
            }
            return c2;
        }
        return c1;
    }


}

创建TGroupingComparator.class

package com.xiaoke.mapreduce.topn;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TGroupingComparator extends WritableComparator {

    public TGroupingComparator() {
        super(TKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        TKey k1 = (TKey) a;
        TKey k2 = (TKey) b;
        //  按着 年,月分组
        int c1 = Integer.compare(k1.getYear(), k2.getYear());
        if (c1 == 0) {
            return Integer.compare(k1.getMonth(), k2.getMonth());
        }
        return c1;
    }
}

创建TReducer.class

package com.xiaoke.mapreduce.topn;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class TReducer extends Reducer<TKey, IntWritable, Text, IntWritable> {

    Text rkey = new Text();
    IntWritable rval = new IntWritable();

    @Override
    protected void reduce(TKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 1970-6-4 33   33
        // 1970-6-4 32   32
        // 1970-6-22 31   31
        // 1970-6-4 22   22
        System.out.println("- - - --- reduce - - TKey - -  -" + key.toString());

        //问题:  对着value进行迭代,,key会不会变?  会变
        //程序开发,方法传参:  值传递,引用传递
        Iterator<IntWritable> iterator = values.iterator();
        int flg = 0;
        int day = 0;
        while (iterator.hasNext()) {
            IntWritable val = iterator.next();
            System.out.println("TReducer values key - - - -" + val);

            if (flg == 0) {
                rkey.set(key.getYear() + "-" + key.getMonth() + "-" + key.getDay());
                rval.set(key.getWd());
                context.write(rkey, rval);
                flg++;
                day = key.getDay();
            }

            if (flg != 0 && day != key.getDay()) {
                rkey.set(key.getYear() + "-" + key.getMonth() + "-" + key.getDay());
                rval.set(key.getWd());
                context.write(rkey, rval);
                break;
            }

        }

    }
}

备注

  • 本地单机模式: 
    configuration.set("mapreduce.framework.name", "local");
  • 本地集群模式: 
    // 需要注掉configuration.set("mapreduce.framework.name", "local");
    job.setJar("D:\code\mayun_hadoop\test\hadoop\target\hadoop-hdfs-1.0-SNAPSHOT.jar");

最终结果

1970-8-8    77
1970-8-23    66
2018-3-11    18
2018-4-23    65
2019-5-21    33
2019-6-2    50
2019-6-1    44

 将地址替换掉

思路:需要考虑到底是在map进行,还是在reduce处理。

  • 数据量小在map进行,数据量大在reduce处理

地址原数据

[root@ke01 bigdata]# hdfs dfs -cat /data/dist/dist.txt
1 beijing
2 shanghai
3 Guangzhou

MyTopN类添加:

job.addCacheFile(new Path("/data/dist1/dist.txt").toUri());

TKey类增加:并添加进输入输出流中

private String location;

TMapper中 重写setup方法,该方法会在map方法之前执行

    public HashMap<String,String> dict = new HashMap<String,String>();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
//        1    beijing
//        2    shanghai
//        3    Guangzhou
        URI[] cacheFiles = context.getCacheFiles();
        Path path = new Path(cacheFiles[0].getPath());
        BufferedReader bufferedReader= new BufferedReader(new FileReader(new File(path.getName())));
        String line = bufferedReader.readLine();
        while (line != null){
            String[] split = line.split(" ");
            dict.put(split[0], split[1]);
            line = bufferedReader.readLine();
        }
    }


将数据最终设置进map方法中输出
            mkey.setLocation(dict.get(split[1]));

最终数据:

[root@ke01 bigdata]# hdfs dfs -cat /data/topn/output3/part-r-00000
1970-8-8_beijing    77
1970-8-23_shanghai    66
2018-3-11_Guangzhou    18
2018-4-23_beijing    65
2019-5-21_Guangzhou    33
2019-6-2_shanghai    50
2019-6-1_beijing    44

代码: https://gitee.com/Xiaokeworksveryhard/big-data.git
原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14111020.html