大数据之mapreduce小实战

手写wordcount的程序

1、pom.xml

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>

2、新建Mapper类

package com.hsiehchou.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 海量数据
*
* hello hsiehchou
* nihao
*
* 数据的输入与输出以Key value进行传输
* keyIN:LongWritable(Long) 数据的起始偏移量
* valuewIN:具体数据
*
* mapper需要把数据传递到reducer阶段(<hello,1>)
* keyOut:单词 Text
* valueOut:出现的次数IntWritable
*
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//对数据进行打散 ctrl+o
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、接入数据 hello nihao
String line = value.toString();
//2、对数据进行切分
String[] words = line.split(" ");
//3、写出以<hello,1>
for (String w:words){
//写出reducer端
context.write(new Text(w), new IntWritable(1));
}
}
}

3、新建Reducer类

package com.hsiehchou.wordcount;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reducer阶段接收的是Mapper输出的数据
* mapper的输出是reducer输入
*
* keyIn:mapper输出的key的类型
* valueIn:mapper输出的value的类型
*
* reducer端输出的数据类型,想要一个什么样的结果<hello,1888>
* keyOut:Text
* valueOut:IntWritalble
*
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//key-->单词 value-->次数
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1、记录出现的次数
int sum = 0;
for (IntWritable v:values){
sum += v.get();
}
//2、l累加求和输出
context.write(key, new IntWritable(sum));
}
}

4、新建驱动类

package com.hsiehchou.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、创建job任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、指定jar包位置
job.setJarByClass(WordCountDriver.class);
//3、关联使用的Mapper类
job.setMapperClass(WordCountMapper.class);
//4、关联使用的Reducer类
job.setReducerClass(WordCountReducer.class);
//5、设置mapper阶段输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//6、设置reducer阶段输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//7、设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//8设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//9、提交任务
boolean rs = job.waitForCompletion(true);
System.exit(rs ? 0:1);
}
}
运行结果
[root@hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.wordcount.WordCountDriver /wc/in /wc/out
[root@hsiehchou121 ~]# hdfs dfs -cat /wc/out/part-r-00000
fd 1
fdgs 1
fdsbv 1
gd 1
hello 3

5、IDEA的相关使用

Ctrl+O导入相关未实现的方法
Maven中的Lifecycle的package可以直接打包成jar

案例分析

需求:运营商流量日志
10086
计算每个用户当前使用的总流量
思路?总流量 = 上行流量+下行流量
三个字段:手机号 上行流量 下行流量
技术选型:PB+
数据分析:海量数据(存储hdfs)
海量数据计算(分布式计算框架MapReduce)

实现

FlowBean类
package com.hsiehchou.logs;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 封装数据类型需要怎么做
* hadoop数据类型实现了序列化接口
* 如果自定义需要实现这个序列化接口
*/
public class FlowBean implements Writable {
//定义属性:上行流量 下行流量 总流量总和
private long upFlow;
private long dfFlow;
private long flowsum;
public FlowBean(){}
public FlowBean(long upFlow, long dfFlow){
this.upFlow = upFlow;
this.dfFlow = dfFlow;
this.flowsum = upFlow + dfFlow;
}
public long getUpFlow(){
return upFlow;
}
public void setUpFlow(long upFlow){
this.upFlow = upFlow;
}
public long getDfFlow(){
return dfFlow;
}
public void setDfFlow(long dfFlow){
this.dfFlow = dfFlow;
}
public long getFlowsum(){
return flowsum;
}
public void setFlowsum(long flowsum){
this.flowsum = flowsum;
}
//序列化
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowsum);
}
//反序列化
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dfFlow = in.readLong();
flowsum = in.readLong();
}
@Override
public String toString() {
return upFlow + " " + dfFlow + " " + flowsum;
}
}
FlowCountMapper类
package com.hsiehchou.logs;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* keyIN:
* valueIN:
*
* 思路:根据想要的结果的kv类型 手机号 流量总和(上行+下行)自定义类
* keyOut:
* valueOut:
*/
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、接入数据源
String line = value.toString();
//2、切割
String[] fields = line.split(" ");
//3、拿到关键字段
String phoneNr = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long dfFlow = Long.parseLong(fields[fields.length - 2]);
//4、写出到reducer
context.write(new Text(phoneNr), new FlowBean(upFlow,dfFlow));
}
}
FlowCountReducer类
package com.hsiehchou.logs;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlow_sum = 0;
long dfFlow_sum = 0;
for (FlowBean v:values){
upFlow_sum += v.getUpFlow();
dfFlow_sum += v.getDfFlow();
}
FlowBean rsSum = new FlowBean(upFlow_sum, dfFlow_sum);
//输出结果
context.write(key, rsSum);
}
}
FlowCountDriver类
package com.hsiehchou.logs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.创建job任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.指定kjar包位置
job.setJarByClass(FlowCountDriver.class);
//3.关联使用的Mapper
job.setMapperClass(FlowCountMapper.class);
//4.关联使用的Reducer类
job.setReducerClass(FlowCountReducer.class);
//5.设置mapper阶段输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//6.设置reducer阶段输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//优化含有大量小文件的数据
//设置读取数据切片的类
job.setInputFormatClass(CombineTextInputFormat.class);
//最大切片大小8M
CombineTextInputFormat.setMaxInputSplitSize(job, 8388608);
//最小切片大小6M
CombineTextInputFormat.setMinInputSplitSize(job, 6291456);
//7.设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//8.设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//9.提交任务
boolean rs = job.waitForCompletion(true);
System.exit(rs? 0:1);
}
}
运行结果
[root@hsiehchou121 ~]# hdfs dfs -mkdir -p /flow/in
[root@hsiehchou121 ~]# hdfs dfs -put HTTP_20180313143750.dat /flow/in
[root@hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.logs.FlowCountDriver /flow/in /flow/out
[root@hsiehchou121 ~]# hdfs dfs -cat /flow/out/part-r-00000
13480253104 120 1320 1440
13502468823 735 11349 12084
13510439658 1116 954 2070
13560436326 1136 94 1230
13560436666 1136 94 1230
13560439658 918 4938 5856
13602846565 198 910 1108
13660577991 660 690 1350
13719199419 240 0 240
13726130503 299 681 980
13726238888 2481 24681 27162
13760778710 120 120 240
13822544101 264 0 264
13884138413 4116 1432 5548
13922314466 3008 3720 6728
13925057413 11058 4243 15301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 369 338 707
15889002119 938 380 1318
15920133257 316 296 612
18212575961 1527 2106 3633
18320173382 9531 212 9743

小文件优化

如果企业中存在海量的小文件数据
TextInputFormat按照文件规划切片,文件不管多小都是一个单独的切片,启动mapt
ask任务去执行,这样会产生大量的maptask,浪费资源。
优化手段:
小文件合并大文件,如果不动这个小文件内容。

原文地址:https://www.cnblogs.com/hsiehchou/p/10403452.html