hadoop03

hadoop 03

1.checkpoint机制

  • hadoop如何进行checkpoint的呢?

    hdfs的源数据记录在内存中,它是一个对象。当客户端进行hdfs操作(rm mkdir...),然后传输给服务端namenode进行解析,执行更新操作,为了保证数据持久化,它会把数据进行序列化存储,当然它不会每操作一次进行序列化一次,否则资源消耗太大,它将操作命令记录在日志中,记录日志文件伴随记录日志文件越来越大,它会有个最大存储空间,当到大了最大存储空间会再生成一个日志文件进行记录(日志1,日志2,日志3...),而namenode会定期的把持久化文件和日志文件传输到secondary namenode ,传输到拷贝文件在secondary会将内容反序列化存储对象,将加载传输日志操作进行更新,会形成一个新的存储对象,将新的存储对象序列化,形成新的存储对象会传输到namenode中并进行更新操作。
    
  • hadoop checkpoint机制 默认1小时合并或一分钟内操作100万次操作进行合并。

2.windows下本地运行mapreduce

  • 下载hadoop并解压。如果运行时报空指针异常需要下载对应版本hadoop.dll和winutils.exe hadoop.dll放在c://windows/system32 。winutils.exe放在hadoop/bin下。

下载winutils.exe

https://github.com/cdarlint/winutils

3.MapReduce

  • MapReduce主要分为2个部分,编程模型和运行时环境

    编程模型:提供简单的接口,实现几个简单函数实现分布式程序
    运行时环境:比较复杂节点的通信,节点失效,数据切分等,也可运行在YARN平台(负责资源调度),
    
  • 特点:

    易于编程,良好扩展性,高的容错性
    
  • MapReduce组件

    InputFormat
    Mapper
    Parititioner(分区)
    Reduce
    
  • 它的模式分为

    local模式(本地)
    yarn集群模式
    
  • 因为单台机器运算能力有限,mapreduce是分布式的运算框架,可以解决海量数据的快速运算。假如你有3台机器,三台机器指定任务为止,并行运算。

  • 以单词统计为例:

    我们有一个文件/data/txt/1.txt   我们通过任务划分让不同机器处理不同范围数据。输出中间结果存储在不同两个任务,通过hashcode指定中间结果分配到任务区。由Reduce处理指定任务。
    
  • 运算整体的逻辑被分成两步map(分区)reduce(聚合)。hashcode%reduce机器的个数,将key分组,reduce处理指定任务。

  • 我们做文件处理时候关键点:

    1.做任务存储切块。
    2.做任务的切片。
    3.分区保证相同key在同一个reduce中,使用hashcode算法。
    

2.0前序-单词统计:

  • 当前要统计一个文本单词数量那么如何做呢?map-reduce

    假如我们对一个300M文件进行单词统计,集群中我们配置3台机器(l1,l2,l3)。文件分block块存储在3台机器上:
    l1 0M-128M
    l2 128M-256M
    l3 257M-300M
        
    我们通过编写的map分布在每天机器上,每台机器上通过读取文本中指定偏移内容,通过map读取每一行的数据(每读取一行信息map调用一次),并进行计数1。然后通过hashcode它们的key 映射到相应机器的reduce中进行汇总,映射到reduce对不同的key分组,key有多少组,reduce调用读诵好次。
    
    • WordCountMapper.java
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /*
     *1.map端输入是偏移量和行数据key-value long string
     *2.map端输出key-value数据
     *3.map输出为reduce的输入key-value序列化
     *Sring Text
     *long Longwritable
     *int IntWritable
     * 
     */
    
    public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    		// value转换字符串
    		String line = value.toString();
    		// 切割
    		String[] words = line.split(" ");
    		for(String word:words) {
    			// 将处理结果写出key value形式,到reduce端
    			context.write(new Text(word), new IntWritable(1));
    		}
    	}
    }
    
    
    • WordCountReduce.java
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    /*
     *KEYIN Text
     *VALUEIN IntWritable
     * 
     */
     
    public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
    	protected void reduce(Text key, Iterable<IntWritable> iters, Reducer<Text,IntWritable,Text,IntWritable>.Context context) throws java.io.IOException ,InterruptedException {
    		// 遍历循环
    		int count  = 0;
    		for(IntWritable intWritable :iters) {
    			count ++;
    		}
    		context.write(key, new IntWritable(count));
    	};
    }
    
    
    • 上述mapper的key-value会存在问题,如果我们有100万行数据就会创建100万key和value显然性能是很有影响的。
    public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        // 创建k-v对象。
    	Text k = new Text();
    	IntWritable v = new IntWritable();
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    		// value转换字符串
    		String line = value.toString();
    		// 切割
    		String[] words = line.split(" ");
    		for(String word:words) {
    			k.set(word);
    			v.set(1);
    			context.write(k,v);
    		}
    	}
    }
    
    

2.1分布式统计单词

1.Map程序:

  • 读取HDFS指定范围的文件内容,并进行处理,写入到不同文件中。
package com.xjk.mapreduce;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/*
 * 1.读取HDFS数据 /data/
 * 
 * */
public class MapTask {
	public static void main(String[] args) throws Exception {
		// 在执行程序的时候主函数是可以接收参数的
		// 在主程序运行时候指定读取文件
		String path = args[0];
		// 读取任务起始位置
		long start = Long.parseLong(args[1]);
		// 读取任务长度
		long length = Long.parseLong(args[2]);
		// 任务编号
		String taskId = args[3];
		// 获取操作hdfs客户端对象
		Configuration conf = new Configuration();
		// FileSystem t = FileSystem.newInstance(uri, conf, user)()
		FileSystem fs = FileSystem.newInstance(new URI("hdfs://linux01:9000/"), conf, "root");
		//FileSystem fs = FileSystem.get(new URI("hdfs://linux01:9000/"), conf, "root");
		// 创建写对象
		FSDataOutputStream out0 = fs.create(new Path("/data/wc/output/map_" + taskId+ "_0"));
		FSDataOutputStream out1 =fs.create(new Path("/data/wc/output/map_" + taskId+ "_1"));
		// 要读取文件
		Path file = new Path(path);
		// 读取hdfs指定数据
		FSDataInputStream fin = fs.open(file);
		if (start!=0) {
			fin.seek(start);// 表示从起始位置开始读
		}
		BufferedReader br = new BufferedReader(new InputStreamReader(fin));
		String line = null;
		long count = 0;
		if (start !=0) {
			br.readLine();// 如果当前读取行数据不在起始位置,就会将当前行数据读取,舍去
		}
		
		
		while ((line = br.readLine()) != null) {
			count +=line.length()+2;//换行存在/r/n情况
			String[] words = line.split(" ");
			// 将单词写到不同文件中 根据单词hashcode reduce 个数为2
			for (String word: words) {
				if((word.hashCode()&Integer.MAX_VALUE)%2==0) {
					out0.write((word + "	" + 1 + "
").getBytes());
				}else {
					out1.write((word + "	" + 1 + "
").getBytes());
				}
			}
			if (count > length) {
				break;
			}
			
		}
		out0.close();
		out1.close();
		br.close();
		fin.close();
		fs.close();
		
		
	}
}

2Reduce程序:

package com.xjk.ts;


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

/*
 * 1.根据任务编号来处理对应数据
 * 2.
 * */
public class ReduceTask {
	public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
		Map<String,Integer> map = new HashMap<>();
		String taskId = args[0];
		// 读取HDFS的数据
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.newInstance(new URI("hdfs://linux01:9000/"), conf, "root");
		//FileSystem fs = FileSystem.get(new URI("hdfs://linux01:9000/"), conf, "root");
		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/data/wc/output"), false);
		while (listFiles.hasNext()) {
			LocatedFileStatus next = listFiles.next();
			Path path = next.getPath();
			String fileName = path.getName();
			if (fileName.endsWith(taskId)) {
				// 读取当前文件
				FSDataInputStream fin = fs.open(path);
				BufferedReader br = new BufferedReader(new InputStreamReader(fin));
				String line = null;
				while ((line=br.readLine()) != null) {
					String[] split = line.split("	");
					String word = split[0];
					map.put(word, map.getOrDefault(word,0)+1);
				}
				br.close();
				fin.close();
			}
		}
		FSDataOutputStream out = fs.create(new Path("/data/wc/res/readuce_" + taskId));
		
		// map
		Set<Entry<String,Integer>> entrySet = map.entrySet();
		for (Entry<String, Integer> entry:entrySet) {
			out.write((entry.getKey()+ "	" + entry.getValue()+"
").getBytes());
		}
		out.close();
		//fs.close();
	}
}

  • 将项目打jar包
右键项目->Export->JAR File->选取路径
  • 上传jar和源数据文件
  • 执行命令:
执行命令:

map:
			执行类Map类				指定源数据文件路经		起始位置  提取字节  任务id
hadoop jar wc.jar com.xjk.ts.MapTask /data/wc/input/word.txt 0 2000 0

reduce:
			执行reduce类			    任务id
hadoop jar wc.jar com.xjk.ts.ReduceTask 0




如报错:Filesystem closed、InterruptedIOException、The client is stopped、IOException

方式1:
conf.set("fs.hdfs.impl.disable.cache", "true");
方式2:
<property>
  <name>fs.hdfs.impl.disable.cache</name>
  <value>true</value>
  <description></description>
</property>
并且加上:
FileSystem fs = FileSystem.newInstance(new URI("hdfs://linux01:9000/"), conf, "root");
// 原因:
参数fs.hdfs.impl.disable.cache默认为false,于是这个conf被Cache,导致在方法外的FileSystem closed,设置为true即可。
get方法不是每次都创建FileSystem对象,会从缓存中获取FileSystem对象,而newInstance方法则会每次都创建新对象。所以在使用该对象的API编程时,推荐使用get方法。
注意:用get不能close,否则多线程报错(所以我用static),而用newInstance必须每次close。
  • 小插曲:
find命令:
# 查找根目录下以jar结尾   xargs打印一行, sed 's/ /:/g'将所有空格替换成:
find / -name "*.jar" | xargs | sed 's/ /:/g'

2.3程序运行

  • 任务执行:

    1.MapTask 和ReduceTask 打包
    2.准备数据
    3.将jar包上传linux机器上
    4.在不同机器上执行MapTask ReduceTask任务,通过参数传递任务量。
    
    

4.单机统计单词示例

  • 统计一个文件夹下单词数量:
WordCountMapper.java
package com.xjk.wordcounts;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper  extends Mapper<LongWritable, Text, Text, IntWritable>{
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    		throws IOException, InterruptedException {
    	String line = value.toString();
    	String[] words = line.split(" ");
    	for (String word : words) {
    		//将处理的结果写出  key value ----> reduce端
    		context.write(new Text(word), new IntWritable(1));
		}
    }

}
// WordCountReduce.java
package com.xjk.wordcounts;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	protected void reduce(Text key, Iterable<IntWritable> iters,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		//hello   6 
		int count = 0 ;
		for (IntWritable intWritable : iters) {
			count ++ ;
		}
		context.write(key, new IntWritable(count));
	}
}

// DriverClass.java

package com.xjk.wordcounts;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DriverClass {
	public static void main(String[] args) throws Exception {
		// 生成默认配置
		Configuration configuration = new Configuration();
		configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
		Job job = Job.getInstance(configuration);
		// map和reduce的类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReduce.class);
		// map输出k-v类型,
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		//reduce输出k-v类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		//输入数据  设置默认处理文件路径,默认处理文本数据long line
		FileInputFormat.setInputPaths(job, new Path("d:/wc/"));
		//输出数据路径
		FileOutputFormat.setOutputPath(job, new Path("d:/wc/res"));
		// 设置reduce数量
		job.setNumReduceTasks(2);
		// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
		job.waitForCompletion(true);
	}
}


  • reduce可自己进行数量设置。map启动数量默认是按照文件格式来进行map任务启动。比如你启动10个maptask,就会有10个maptask.但是这10个map小于128M.如果高于128M就会进行任务切片。

5.统计电影平均分

  • 设计

    Mapper中 以电影名字做为key,分数做为value
    根据电影hashcode取余进行分组,分给不同reducer
    
  • 代码

    // MovieMapper.java
    import java.io.IOException;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import com.google.gson.Gson;
    
    // 电影数据输入k-v 输出k-v
    public class MovieMapper extends Mapper<LongWritable, Text,Text,DoubleWritable> {
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 读取每行数据
    		String line =value.toString();
    		// 将每行数据转换成对象
    		Gson gs = new Gson();
    		// 用Gson 将每行数据转成java对象
    		MoiveBean mb = gs.fromJson(line, MoiveBean.class);
    		// 输出k-v 电影-分数
    		context.write(new Text(mb.getMovie()), new DoubleWritable(mb.getRate()));
    		
    	}
    }
    // MobieReduce.java
    import java.io.IOException;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MovieReduce extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
    	// 相同电影id会执行一次这个方法
    	// Iterable存储相同一组电影Id
    	@Override
    	protected void reduce(Text movie, Iterable<DoubleWritable> iters,
    			Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
    		// 总分
    		double sum = 0;
    		// 数量统计
    		int count = 0;
    		for (DoubleWritable doubleWritable: iters) {
    			sum += doubleWritable.get();
    			count++;
    		}
    		// 平均分
    		double avg = sum / count;
    		// 写出
    		context.write(movie, new DoubleWritable(avg));
    	}
    }
    // MovieBean.java
    public class MoiveBean {
    	private String movie;
    	private double rate;
    	private String timeStamp;
    	private int uid;
    	@Override
    	public String toString() {
    		return "MoiveBean [movie=" + movie + ", rate=" + rate + ", timeStamp=" + timeStamp + ", uid=" + uid + "]";
    	}
    	public String getMovie() {
    		return movie;
    	}
    	public void setMovie(String movie) {
    		this.movie = movie;
    	}
    	public double getRate() {
    		return rate;
    	}
    	public void setRate(double rate) {
    		this.rate = rate;
    	}
    	public String getTimeStamp() {
    		return timeStamp;
    	}
    	public void setTimeStamp(String timeStamp) {
    		this.timeStamp = timeStamp;
    	}
    	public int getUid() {
    		return uid;
    	}
    	public void setUid(int uid) {
    		this.uid = uid;
    	}
    }
    
    // MovieDriver.java
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MovieDriver {
    	public static void main(String[] args) throws Exception {
    		Configuration configuration = new Configuration();
    		configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
    		Job job = Job.getInstance(configuration);
    		// 指定map和reduce的类
    		job.setMapperClass(MovieMapper.class);
    		job.setReducerClass(MovieReduce.class);
    		// map 输出k-v
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(DoubleWritable.class);
    		// reduce 输出k-v
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(DoubleWritable.class);
    		// 输入数据文件路经
    		FileInputFormat.setInputPaths(job, new Path("d:/data/sourcedata/simple"));
    		//输出数据路径
    		FileOutputFormat.setOutputPath(job, new Path("d:/data/movie"));
    		// 设置reduce数量
    		job.setNumReduceTasks(2);
    		// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
    		job.waitForCompletion(true);
    	}
    }
    
    
原文地址:https://www.cnblogs.com/xujunkai/p/14055798.html