目录
一、MapReduce组件
1、Combiner-合并
可以在Driver类中通过job.setCombinerClass(XXXReducer.class);来设置Combiner类
Combiner实际上是在不改变计算结果前提的下来减少Reducer的输入数据量
在实际过程中,如果添加Combiner,那么可以有效的提高MapReduce的执行效率,缩短MapReduce的执行时间。
2、InputFormat-输入格式
①、InputFormat发生在MapTask之前。数据由InputFormat来负责进行切分和读取,然后将读取到的数据给MapTask处理,所以InputFormat读取出来的数据是什么类型,MapTask接收的数据就是什么类型
②、作用:
③、在MapReduce中,如果不指定,那么默认使用TextInputFormat。而TextInputFormat继承了FileInputFormat。默认情况下。FileInputFormat负责对文件进行切片处理;TextInputFormat赋值提供输入流来读取数据
④、FileInputFormat在对文件进行切片过程中的注意问题
⑤、 TextInputFormat在读取数据过程中需要注意的问题
⑥、自定义输入格式:定义一个类继承InputFormat,但是考虑到切片过程相对复杂,所以可以考虑定义一个类继承FileInputFormat,而在FileInputFormat中已经覆盖了切片过程,只需要考虑如何实现读取过程即可
⑦、多源输入:在MapReduce中,允许同时指定多个文件作为输入源,而且这多个文件可以放在不同的路径下。这多个文件的数据格式可以不同,可以为每一个文件单独指定输入格式
3、OutputFormat-输出格式
①、OutputFormat发生在ReduceTask之后,接收ReduceTask产生的数据,然后将结果按照指定格式来写出
②、作用:
③、在MapReduce中,如果不指定,默认使用的是TextOutputFormat。 TextOutputFroamt继承了FileOutputFormat。其中,FileOutputFormat负责对输出路径进行校验,TextOutputFormat则是对数据进行写出
④、在MapReduce中,也支持自定义输出格式以及多源数据,但是注意,实际开发中自定义输出格式以及多源输出用的非常少
二、Shuffle
1、Map端的Shuffle
①、当MapTask调用map方法处理数据之后,会将处理结果进行写出,写出到MapTask自带的缓冲区中。每一个MapTask都会自带一个缓冲区,本质上是一个环形的字节数组,维系在内存中,默认大小是100M
②、数据在缓冲区中会进行分区、排序,如果指定了combiner,那么还会进行合并。这次排序是将完全杂乱没有规律的数据整理成有序的数据,所以使用的是快速排序(Quick Sort)
③、当缓冲区使用达到指定阈值(默认是0.8,即缓冲区使用达到80%)的时候,会进行spill(溢写),产生一个溢写文件。因为数据在缓冲区已经分区且排序,所以产生的单个溢写文件中的数据是分好区且排好序的
④、溢写之后,MapTask产生的数据会继续写道缓冲区中,如果再次达到条件,会再次进行溢写。每一个溢写都会产生一个新的溢写文件。多个溢写文件之间的数据是局部有序但整体无序的。
⑤、当所有数据都处理完之后,那么MapTask会将所有的溢写文件进行合并(merge),合并成一个大的结果文件final out。在merge的时候,如果有数据依然在缓冲区中,那么会将缓冲区中的数据直接merge到final out中
⑥、在merge过程中,数据会再次进行分区且排序,因此final out 中的数据是分好区且排好序的。如果溢写文件个数达到3个及以上,并且指定了Combiner,那么在merge过程中还会进行combine这次排序是将局部有序的数据整理成整体有序的状态,所以采用的是归并排序(Merge Sort)
⑦、注意问题
2、Reduce端的Shuffle
①、当ReduceTask达到启动阈值(默认是0.05,即当有5%的MapTask结束)的时候,就会启动来抓取数据
②、ReduceTask启动之后,会在当前服务器上来启动多个(默认是5个)fetch线程来抓取数据
③、fetch线程启动之后,会通过HTTP请求中的get请求来获取数据,在发送请求的时候会携带分区号作为参数
④、fetch线程会将抓取来的数据临时存储到本地磁盘上,形成一个个的小文件
⑤、当所有的fetch抓取完数据之后,ReduceTask会将这些小文件进行merge,合并成一个大文件。在merge过程中,会对数据再次进行排序。这次排序是将局部有序的数据整理成整体有序的状态,所以采用的是归并排序
⑥、merge完成之后,ReduceTask会将相同的键对应的值分到一组去,形成一个(伪)迭代器(本质上是一个基于迭代模式实现的流),这个过程称之为分组(group)
⑦、分组之后,每一个键调用一次reduce方法
3、MapReduce执行流程
4、Shuffle优化
①、适当的增大缓冲区。实际过程中,可以缓冲区设置为250M~400M之间
②、增加Combiner,但是不是所有场景都适合于使用Combiner
③、可以考虑对结果进行压缩传输。如果网络条件比较差,那么可以考虑将final out文件压缩之后再传递给ReduceTask,但是ReduceTask收到数据之后需要进行解压,所以这种方案是在网络传输和压缩解压之间的一种取舍
④、适当的考虑fetch线程的数量。
三、扩展
1、小文件问题
①、在大数据环境下,希望所处理的文件都是大文件,但是在生产环境中,依然不可避免的会产生很多小文件
②、小文件的危害
③、到目前为止,市面上针对小文件的处理手段无非两种:合并和打包
④、Hadoop针对小文件提供了原生的打包手段:Hadoop Archive,将指定小文件打成一个har包
2、压缩机制
①、MapReduce支持对数据进行压缩:可以对MapTask产生的中间结果(final out)进行压缩,也支持对ReduceTask的输出结果进行压缩
②、在MapReduce中,默认支持的压缩格式有:Default,BZip2,GZip,Lz4,Spappy,ZStandard,其中比较常用的是BZip2
③、主要是在Driver来进行设置,设置语句也比较简单:
package org.example.compress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
// import org.apache.hadoop.io.compress.CompressionCodec;
// import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CompressDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 设置参数
// 开启Mapper结果的压缩机制
// conf.set("mapreduce.map.output.compress", "true");
// 设置压缩编码类
// conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
Job job = Job.getInstance(conf);
job.setJarByClass(CompressDriver.class);
job.setMapperClass(CompressMapper.class);
job.setReducerClass(CompressReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/txt/words.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/result/compress"));
// 对Reduce结果进行压缩
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
job.waitForCompletion(true);
}
}
3、推测执行机制
①、推测执行机制本质上是MapReduce针对慢任务的一种优化。慢任务指的是其他任务都正常执行完,但是其中几个任务依然没有结束,那么这几个任务就称之为慢任务
②、旦出现了慢任务,那么MapReduce会将这个任务拷贝一份放到其他节点上,两个节点同时执行相同的任务,谁先执行完,那么它的结果就作为最终结果;另外一个没有执行完的任务就会被kill掉
③、慢任务出现的场景
④、在实际生产过程中,因为数据倾斜导致慢任务出现的机率更高,此时推测执行机制并没有效果反而会占用更多的集群资源,所以此时一般会考虑关闭推测执行机制
⑤、推测执行机制配置(放在mapred-site.xml文件中)true是开着的,false是关闭
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
</property>
4、数据倾斜
①、数据倾斜指的是任务之间处理的数据量不均等。例如统计视频网站上各个视频的播放量,那么此时处理热门视频的任务索要处理的数据量就会比其他的任务要多,此时就产生了数据倾斜
②、Map端的数据倾斜的产生条件:多源输入、文件不可切、文件大小不均等。一般认为Map端的倾斜无法解决
③、实际开发中,有90%的数据倾斜发生在了Reduce端,直接原因就是因为是对数据进行分类,本质原因是因为数据本身就有倾斜的特性,可以考虑使用二阶段聚合的方式来处理Reduce端的数据倾斜
5、join
①、如果在处理数据的时候,需要同时处理多个文件,且文件相互关联,此时可以考虑将主要处理的文件放在输入路径中,将其他关联文件缓存中,需要的时候再从缓存中将文件取出来处理
②、案例:统计每一天卖了多少钱