hadoop_05

hadoop05

  • setup cleanup
setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!
cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!

1.小文件处理

  • 因mapreduce处理文件逻辑比较复杂,mapreduce不适合处理大量小文件。

  • 小文件合并

    package com.xjk.mm;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    public class Merger {
    	static class MergerMapper extends Mapper<LongWritable,Text,Text,Text>{
    		String fileName = null;
    		protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context){
                // 获取文件名称。
    			FileSplit f = (FileSplit)context.getInputSplit();
    			fileName = f.getPath().getName();
    		}
    		
    		Text k = new Text();
    		Text v = new Text();
    		@Override
    		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
    				throws IOException, InterruptedException {
    			String line = value.toString();
    			k.set(fileName);
    			v.set(line);
    			context.write(k, v);
    		}
    	}	
    	static class MergerReducer extends Reducer<Text,Text,Text,Text>{
    		Text v = new Text();
    		protected void reduce(Text key, Iterable<Text> values, Reducer<Text,Text,Text,Text>.Context context) throws IOException, InterruptedException {
    			StringBuilder sb = new StringBuilder();
    			for (Text text : values) {
    				sb.append(text.toString() + " ");
    			}
    			v.set(sb.toString().trim());
    			context.write(key, v);
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
    		
    		Job job = Job.getInstance(conf);
    		job.setMapperClass(MergerMapper.class);
    		job.setReducerClass(MergerReducer.class);
    		
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		job.setNumReduceTasks(1);
    		FileInputFormat.setInputPaths(job, new Path("d:/data/merger/input/"));
    		FileOutputFormat.setOutputPath(job, new Path("d:/data/merger/output/"));
    		
    		boolean b = job.waitForCompletion(true);
    		
    		System.exit(b?0:-1);
    		
    	}
    }
    
    

## 2.join方法

- MapReduce提供了表连接操作其中包括Map端join、Reduce端join还有半连接,现在我们要讨论的是Map端join,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。

- 案例

  ```java
  orders.txt
  	订单号   用户id
  	order001,u002
  	order001,u003
  	...
  user.txt
  	订单号    用户id 用户名 年龄  朋友
  	order004,u001,senge,18,angelababy
  	...
  • 代码:

    // JoinBean.java
    package com.xjk.join;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.Writable;
    /*
     * 存储拼接好两张表的数据
     * */
    public class JoinBean implements Writable {
    	private String oid;
    	private String uid;
    	private String name;
    	private int age;
    	private String friend;
    	private String tables; //标识存储在哪张表
    	public String getOid() {
    		return oid;
    	}
    	public void setOid(String oid) {
    		this.oid = oid;
    	}
    	public String getUid() {
    		return uid;
    	}
    	public void setUid(String uid) {
    		this.uid = uid;
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	public int getAge() {
    		return age;
    	}
    	public void setAge(int age) {
    		this.age = age;
    	}
    	public String getFriend() {
    		return friend;
    	}
    	public void setFriend(String friend) {
    		this.friend = friend;
    	}
    	public String getTables() {
    		return tables;
    	}
    	public void setTables(String tables) {
    		this.tables = tables;
    	}
    	@Override
    	public String toString() {
    		return uid + "," + name + "," + age + ","+ friend;
    	}
    	public void write(DataOutput out) throws IOException{
    		out.writeUTF(oid);
    		out.writeUTF(uid);
    		out.writeUTF(name);
    		out.writeInt(age);
    		out.writeUTF(friend);
    		out.writeUTF(tables);
    	}
    	public void readFields(DataInput in)throws IOException{
    		this.oid = in.readUTF();
    		this.uid = in.readUTF();
    		this.name = in.readUTF();
    		this.age = in.readInt();
    		this.friend = in.readUTF();
    		this.tables = in.readUTF();
    	}
    }
    
    // Join.java
    
    
    package com.xjk.join;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Join {
    	static class JoinMapper extends Mapper<LongWritable, Text, Text, JoinBean>{
    		String fileName = null;
    		@Override
    		protected void setup(Mapper<LongWritable, Text, Text, JoinBean>.Context context)
    				throws IOException, InterruptedException {
    			FileSplit f = (FileSplit)context.getInputSplit();
    			fileName = f.getPath().getName();
    		}
    		Text k = new Text();
    		JoinBean j = new JoinBean();
    		@Override
    		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, JoinBean>.Context context)
    				throws IOException, InterruptedException {
    			String line = value.toString();
    			// 读取数据
    			String[] split = line.split(",");
    			if (fileName.startsWith("orders")) {
    				// 读取订单文件
    				j.setOid(split[0]);
    				j.setUid(split[1]);
    				// 因JoinBean其他类变量如果为null,在反序列化会报错,
    				// 可以设置为一个不可见字符
    				j.setName("01");
    				j.setAge(-1);
    				j.setFriend("01");
    				j.setTables("orders");//标识存储在哪个文件
    			}else {
    				// 用户数据
    				j.setUid(split[0]);
    				j.setName(split[1]);
    				j.setAge(Integer.parseInt(split[2]));
    				j.setFriend(split[4]);
    				j.setOid("01");
    				j.setTables("user");
    			}
    			k.set(j.getUid());
    			context.write(k,j);
    		}
    	}
    	
    	static class JoinReducer extends Reducer<Text, JoinBean, Text, NullWritable>{
    		List<JoinBean> list = new ArrayList<>();
    		Map<String,JoinBean> map = new HashMap<>();	
    		@Override
    		protected void reduce(Text uid, Iterable<JoinBean> values,
    				Reducer<Text, JoinBean, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    			// 遍历循环拿joinBean数据
    			for (JoinBean joinBean : values) {
    				String table = joinBean.getTables();
    				if (table.equals("orders")) {
    					// 存储订单信息
    					JoinBean orders = new JoinBean();
    					// 订单只有uid,oid
    					orders.setUid(joinBean.getUid());
    					orders.setOid(joinBean.getOid());
    					list.add(orders);
    				}else {
    					// 存储用户信息
    					JoinBean users = new JoinBean();
    					users.setUid(joinBean.getUid());
    					users.setName(joinBean.getName());
    					users.setAge(joinBean.getAge());
    					users.setFriend(joinBean.getFriend());
    					map.put(users.getUid(), users);
    				}
    			}
    		}
    		@Override
    		protected void cleanup(Reducer<Text, JoinBean, Text, NullWritable>.Context context)
    				throws IOException, InterruptedException {
    			for (JoinBean o : list) {
    				// 根据list中uid,查找对应map用户
    				JoinBean user = map.get(o.getUid());
    				// 数据拼接
    				String res = o.getOid() + "," +user.toString();
    				// 数据写出
    				context.write(new Text(res), NullWritable.get());
    			}
    		}	
    	}
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		 conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
    		Job job = Job.getInstance(conf);
    		job.setMapperClass(JoinMapper.class);
    		job.setReducerClass(JoinReducer.class);
    		// 输出设置:
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(JoinBean.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    		// 输入输出路径 
    		FileInputFormat.setInputPaths(job, new Path("d:/data/orderdata/input/"));
    		FileOutputFormat.setOutputPath(job, new Path("d:/data/orderdata/output/"));
    		
    		boolean b = job.waitForCompletion(true);
    		// 程序退出 , 0  代表正常退出  非0 代表异常退出
    		System.exit(b?0:-1);
    	}
    }
    
    
    
    

3.共同好友案例

  • 的共同好友

    A:B,C,D,F,E,O
    B:A,C,E,K
    C:F,A,D,I
    D:A,E,F,L
    E:B,C,D,M,L
    F:A,B,C,D,E,O,M
    G:A,C,D,E,F
    H:A,C,D,E,O
    I:A,O
    J:B,O
    K:A,C,D
    L:D,E,F
    M:E,F,G
    O:A,H,I,J
    
    分析:
    第一行:A与B是好友,A与C是好友,A与D是好友,A与F是好友,A与E是好友,A与O是好友
    第二行:B与A是好友,B与C是好友,B与E是好友,B与K是好友,
    ...
    这样
    A与B共同好友 --> C  E
    
  • 代码:

    package com.xjk.Friend;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    public class SameF1 {
    	static class SameF1Mapper extends Mapper<LongWritable, Text, Text, Text>{
    		Text k = new Text();
    		Text v = new Text();
    		protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,Text>.Context context) throws java.io.IOException ,InterruptedException {
    			String line = value.toString();
    			String[] split = line.split(":");
    			String[] fs = split[1].split(",");
    			v.set(split[0]);
    			for (String f : fs) {
    				k.set(f);
    				context.write(k, v);
    			}
    			
    		}
    	}
    	static class SamF1Reducer extends Reducer<Text, Text, Text, Text>{
    		@Override
    		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
    				throws IOException, InterruptedException {
    			List<String> list = new ArrayList<>();
    			for (Text text:values) {
    				list.add(text.toString());// k是当前好友,v是当前人
    			}
    			// 对list进行排序
    			Collections.sort(list);
    			
    			for (int i = 0; i < list.size()-1; i++) {
    				for (int j = i+1; j < list.size(); j++) {
    					// b-c a 
    					// b-d a
    					// b-e a
    					context.write(new Text(list.get(i) + "-" + list.get(j) + "的好友是:"), key);
    				}
    			}
    		}
    	}
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		 conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
    		Job job = Job.getInstance(conf);
    		job.setMapperClass(SameF1Mapper.class);
    		job.setReducerClass(SamF1Reducer.class);
    		// 输出设置:
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		// 输入输出路径 
    		FileInputFormat.setInputPaths(job, new Path("d:/data/Friends/input/"));
    		FileOutputFormat.setOutputPath(job, new Path("d:/data/Friends/output/"));
    		
    		boolean b = job.waitForCompletion(true);
    		// 程序退出 , 0  代表正常退出  非0 代表异常退出
    		System.exit(b?0:-1);
    	}
    }
    
    
    • 会有重复
    B-C的好友是:	A
    B-D的好友是:	A
    B-F的好友是:	A
    B-G的好友是:	A
    B-H的好友是:	A
    B-I的好友是:	A
    B-K的好友是:	A
    B-O的好友是:	A
    C-D的好友是:	A
    C-F的好友是:	A
    C-G的好友是:	A
    C-H的好友是:	A
    C-I的好友是:	A
    
  • 代码2:

    package com.xjk.Friend;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    public class SameF2 {
    	static class SameF2Mapper extends Mapper<LongWritable, Text, Text, Text>{
    		Text k = new Text();
    		Text v = new Text();
    		protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,Text>.Context context) throws java.io.IOException ,InterruptedException {
    			String line = value.toString();
    			String[] split =line.split("	");
    			k.set(split[0]);
    			v.set(split[1]);
    			context.write(k, v);
    	}
    	static class SamF2Reducer extends Reducer<Text, Text, Text, Text>{
    		Text v = new Text();
    		@Override
    		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
    				throws IOException, InterruptedException {
    			StringBuilder sb = new StringBuilder();
    			for (Text text : values) {
    				sb.append(text.toString() + " ");
    			}
    			v.set(sb.toString().trim());
    			context.write(key, v);
    		}
    	}
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		 conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
    		Job job = Job.getInstance(conf);
    		job.setMapperClass(SameF2Mapper.class);
    		job.setReducerClass(SamF2Reducer.class);
    		// 输出设置:
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		// 输入输出路径 
    		FileInputFormat.setInputPaths(job, new Path("d:/data/Friends/output/"));
    		FileOutputFormat.setOutputPath(job, new Path("d:/data/Friends/output2/"));
    		
    		boolean b = job.waitForCompletion(true);
    		// 程序退出 , 0  代表正常退出  非0 代表异常退出
    		System.exit(b?0:-1);
    	}
    }
    	}
    
    
    • 聚合
    A-B的好友是:	E C
    A-C的好友是:	D F
    A-D的好友是:	E F
    A-E的好友是:	D B C
    A-F的好友是:	O B C D E
    A-G的好友是:	F E C D
    A-H的好友是:	E C D O
    A-I的好友是:	O
    

4.yarn

  • 我们都知道hdfs重要思想是将数据存储在不同机器上(分布式存储)。它的运算使用分布式运算运用在不同机器上,并行运算一个task,机器的处理资源(如CPU,内存)使用动态扩容,对外使用统一资源而统一资源管理器就是yarn。它为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处

    1.接收客户端任务提交
    2.管理整个集群节点的资源
    3.分配任务,监控各个节点任务运行情况
    
  • 调度资源策略:

    • FIFO(废弃)
    hadoop1.x使用的默认调度器就是FIFO。FIFO采用队列方式将一个一个job任务按照时间先后顺序进行服务。比如排在最前面的job需要若干maptask和若干reducetask,当发现有空闲的服务器节点就分配给这个job,直到job执行完毕。
    
    • Capacity Scheduler

      ![image-20201215131430381](C:UsersXu jkAppDataRoamingTypora ypora-user-imagesimage-20201215131430381.png)

在Yarn框架中,调度器是一块很重要的内容。有了合适的调度规则,就可以保证多个应用可以在同一时间有条不紊的工作。最原始的调度规则就是FIFO,即按照用户提交任务的时间来决定哪个任务先执行,但是这样很可能一个大任务独占资源,其他的资源需要不断的等待。也可能一堆小任务占用资源,大任务一直无法得到适当的资源,造成饥饿。所以FIFO虽然很简单,但是并不能满足我们的需求。


- Fair Scheduler

支持多个队列,每个队列可以配置一定的资源,每个队列中的job任务公平共享其所在队列的所有资源。

队列中的job任务都是按照优先级分配资源,优先级越高分配的资源越多,但是为了确保公平每个job任务都会分配到资源。优先级是根据每个job任务的理想获取资源量减去实际获取资源量的差值决定的,差值越大优先级越高。


## 5.yarn的安装

cd /opt/hdp/hadoop-2.8.5/etc/hadoop

vim yarn-site.xml
配置:

yarn.resourcemanager.hostname linux01 yarn.nodemanager.aux-services mapreduce_shuffle yarn.nodemanager.resource.memory-mb 2048 yarn.nodemanager.resource.cpu-vcores 2 yarn.nodemanager.vmem-check-enabled false yarn.nodemanager.vmem-pmem-ratio 2.1

复制到linux02 和linux03机器上

scp yarn-site.xml linux02:(PWD scp yarn-site.xml linux03:)PWD

一键启动hdfs

start-dfs.sh

jps查看启动namenode datanode

cd /opt/hdp/hadoop-2.8.5/sbin

启动yarn

start-yarn.sh#此时会启动ResourceManager和NodeManager

而stop-all.sh 是将hdfs和yarn全部停止。


## 6.进入yarn的web页面查看资源管理

- http://10.0.0.134:8088/cluster

![image-20201215175354686](C:UsersXu jkAppDataRoamingTypora	ypora-user-imagesimage-20201215175354686.png)

## 7.windows中提交到yarn

- 操作mapreduce设置操作hdfs配置

```java
Configuration conf = new COnfiguration();
// 设置操作hdfs文件
conf.set('fs.defaultFs', 'hdfs://linux01:9000');
// 设置程序运行在yarn ,默认local
conf.set('mapreduce.framework.name', 'yarn');
// 设置resourcemanager主机
conf.set('yarn.resourcemanager.hostname','linux01');
// 允许 mapreduce程序跨平台运行
conf.set('mapreduce.app-submission.cross-platform','true');
// 获取一个任务提交的工作对象
Job job = Job.getInstance(conf);
// 设置jar文件
job.setJar("d:/data/input/");
    
 // 动态获取jar包
job.setJarByClass(JobSubmit2.class);
    
    
  • 先删除hdfs之前output
hdfs dfs -rm -r /data/wc/output
  • 示例,新建package,拿出以前统计单词的包

![image-20201215234201517](C:UsersXu jkAppDataRoamingTypora ypora-user-imagesimage-20201215234201517.png)

  • 更改DriverClass.java
package com.xjk.yarn;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DriverClass {
	public static void main(String[] args) throws Exception {
		// 设置用户名
		System.setProperty("HADOOP_USER_NAME", "root");
		// 生成默认配置
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		//conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
		conf.set("fs.defaultFS", "hdfs://linux01:9000");
		// 设置程序运行在yarn ,默认local
		conf.set("mapreduce.framework.name", "yarn");
		// 设置resourcemanager主机
		conf.set("yarn.resourcemanager.hostname","linux01");
		// 允许 mapreduce程序跨平台运行
		conf.set("mapreduce.app-submission.cross-platform","true");
		// 设置程序的jar路径
		job.setJar("E:\data\wc.jar");
		// map和reduce的类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReduce.class);
		// map输出k-v类型,
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		//reduce输出k-v类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		//输入数据  设置默认处理文件路径,默认处理文本数据long line
		FileInputFormat.setInputPaths(job, new Path("hdfs://linux01:9000/data/wc/input/"));
		//输出数据路径
		FileOutputFormat.setOutputPath(job, new Path("hdfs://linux01:9000/data/wc/output/"));
		// 设置reduce数量
		job.setNumReduceTasks(2);
		// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
		job.waitForCompletion(true);
	}
}

  • 然后File,Export将该包打包到E:datawc.jar下。然后执行main方法。

  • 如果报错System times on machines may be out of sync.Check system time and time zones:

有可能机器时间不同步。可使用ntp将时间同步
  • 查看结果:
hdfs dfs -cat /data/wc/output/part-r-00001
hdfs dfs -cat /data/wc/output/part-r-00000

8.Linux提交yarn

  • 只需要更改main中
// 生成默认配置
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 设置程序运行在yarn ,默认local
conf.set("mapreduce.framework.name", "yarn");
// 设置resourcemanager主机
conf.set("yarn.resourcemanager.hostname","linux01");
job.setJarByClass(DriverClass2.class);
// map和reduce的类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
// map输出k-v类型,
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//reduce输出k-v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入数据  设置默认处理文件路径,默认处理文本数据long line
FileInputFormat.setInputPaths(job, new Path("hdfs://linux01:9000/data/wc/input/"));
//输出数据路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://linux01:9000/data/wc/output2/"));
// 设置reduce数量
job.setNumReduceTasks(2);
// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
job.waitForCompletion(true);
  • 然后打包成wc2.jar,将jar包放到linux机器上,执行如下命令:
hadoop jar /wc2.jar com.xjk.yarn.DriverClass2
  • 查看执行结果
hdfs dfs -cat /data/wc/output2/part-r-00001

9.考试成绩案例

score.txt

统计每个人每个科目均值。每个科目最高分

数据倾斜,加random取hashcode

1.获取每个科目平均分

package com.xjk.score;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class StudentScore {
	static class StudentMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
		Text k = new Text();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] split = line.split(",");
			String course = split[0];
			k.set(course);
			for (int i = 2; i < split.length; i++) {
				int s = Integer.parseInt(split[i]);
				context.write(k, new IntWritable(s));
			}
		}
	}
	static class StudentReducer extends Reducer<Text, IntWritable, Text, Text>{
		@Override
		protected void reduce(Text key, Iterable<IntWritable> value, Reducer<Text, IntWritable, Text, Text>.Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			int count = 0;
			for (IntWritable i : value) {
				sum += i.get();
				count ++;
			}
			double avg = sum / count;
			context.write(key, new Text(avg+""));
		}
	}
	public static void main(String[] args) throws Exception {
				// 生成默认配置
				Configuration configuration = new Configuration();
				configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
				Job job = Job.getInstance(configuration);
				// map和reduce的类
				job.setMapperClass(StudentMapper.class);
				job.setReducerClass(StudentReducer.class);
				// map输出k-v类型,
				job.setMapOutputKeyClass(Text.class);
				job.setMapOutputValueClass(IntWritable.class);
				//reduce输出k-v类型
				job.setOutputKeyClass(Text.class);
				job.setOutputValueClass(Text.class);
				//输入数据  设置默认处理文件路径,默认处理文本数据long line
				FileInputFormat.setInputPaths(job, new Path("d:/data/studentsocre/input"));
				//输出数据路径
				FileOutputFormat.setOutputPath(job, new Path("d:/data/studentsocre/output"));
				// 设置reduce数量
				job.setNumReduceTasks(1);
				// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
				job.waitForCompletion(true);
	}
}

原文地址:https://www.cnblogs.com/xujunkai/p/14176275.html