数据压缩、数据倾斜join操作

1、数据压缩发生阶段

操作 压缩
数据源 》数据传输 数据压缩
mapper map端输出压缩
》数据传输 数据压缩
reducer reduce端输出压缩
》数据传输 数据压缩
结果数据

设置map端输出压缩:
1)开启压缩
conf.setBoolean

//开启map端输出压缩
conf.setBoolean("mapreduce.map.output.compress",true);

2)设置具体压缩编码
conf.setClass

//设置压缩方式
//conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);

设置reduce端输出压缩:
1)设置reduce输出压缩
FileOutputFormat.setCompressOutput

//设置reduce端输出压缩
FileOutputFormat.setCompressOutput(job,true);

2)设置具体压缩编码
FileOutputFormat.setOutputCompressorClass

//设置压缩方式
//FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);
//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileOutputFormat.setOutputCompressorClass(job,DefaultCodec.class);

hive数据仓库:mapreduce 用hsql处理大数据

2、压缩编码使用场景

1-> Gzip压缩方式

压缩率比较高,并且压缩解压缩速度很快
hadoop自身支持的压缩方式,用gzip格式处理数据就像直接处理文本数据是完全一样
的;
在linux系统自带gzip命令,使用很方便简洁
不支持split
使用每个文件压缩之后大小需要在128M以下(块大小)
200M-》设置块大小

2->LZO压缩方式

压缩解压速度比较快并且,压缩率比较合理
支持split
在linux系统不可以直接使用,但是可以进行安装
压缩率比gzip和bzip2要弱,hadoop本身不支持
需要安装

3->Bzip2压缩方式

支持压缩,具有很强的压缩率。hadoop本身支持
linux中可以安装
压缩解压缩速度很慢

4->Snappy压缩方式

压缩解压缩速度很快,而且有合理的压缩率
不支持split

3、数据倾斜

reduce join

4、Hadoop中有哪些组件

HDFS:数据的分布式存储
MapReduce:数据的分布式计算
Yarn:资源调度(cpu/内存…)
Yarn节点:resourceManager
nodeManager

5、进行两个表的拼接

DistributedCacheMapper类

package com.hsiehchou.mapjoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
/**
* mapjoin
* 完成两张表数据的关联操作
*/
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
HashMap<String, String> pdMap = new HashMap<String, String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//1.加载缓存文件
URI[] cacheFiles = context.getCacheFiles();
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(cacheFiles[0].getPath()), "UTF-8"));
//这里可以将文件放在当前项目文件下,如果不放就用上面的那两句
//BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "UTF-8"));
String line;
//2.判断缓存文件不为空
while(StringUtils.isNotEmpty(line = br.readLine())){
//切割数据
String[] fields = line.split(" ");
//缓冲 到 集合; 商品ID 商品名
pdMap.put(fields[0],fields[1]);
}
br.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取数据
String line = value.toString();
//2.切分数据
String[] fields = line.split(" ");
//3.获取商品的pid,商品名称
String pid = fields[1];
String pName = pdMap.get(pid);
//4.拼接
line = line + " " + pName;
//5.输出
context.write(new Text(line),NullWritable.get());
}
}

DistributedCacheDriver类

package com.hsiehchou.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class DistributedCacheDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
//创建job任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定jar包位置
job.setJarByClass(DistributedCacheDriver.class);
//关联使用的Mapper
job.setMapperClass(DistributedCacheMapper.class);
//设置最终的输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置数据输入的路径
FileInputFormat.setInputPaths(job,new Path("e://test//table//in"));
//设置数据输出的路径
FileOutputFormat.setOutputPath(job,new Path("e://test//table//out"));
//加载缓存数据
job.addCacheFile(new URI("file:///e:/test/inputcache/pd.txt"));
//注意:没有跑reducer 需要指定reduceTask为0
job.setNumReduceTasks(0);
//提交任务
boolean rs = job.waitForCompletion(true);
System.exit(rs? 0:1);
}
}

本地模式测试

URI[] cacheFiles = context.getCacheFiles();
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(cacheFiles[0].getPath()), "UTF-8"));

集群模式时

conf.set("mapreduce.framework.name", "yarn");yarn模式
job.addCacheFile(new URI("hdfs:///test2/pd.txt"));//添加hdfs文件做缓存
原文地址:https://www.cnblogs.com/hsiehchou/p/10408064.html