hadoop的三大核心组件之MapReaduce,hadoopmapreaduce
Hadoop的三大核心组件之MapReaduce
MapReduce是什么?
MR是一个分布式计算框架,它是Hadoop的一个程序,不会产生进程。
MR部分需要结合代码来理解学习,由于代码篇幅原因不方便截图,代码已经贴到github上,注释也挺详细,有需要的朋友可以去看,传送门:https://github.com/ZzzzZzreal/HadoopKeyPoints
(DATA文件夹是代码测试使用的数据,RESULT文件夹是代码测试结果)
======================================================================
A、自定义序列化类
代码及注释参见--https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/MyComparator.java
有时候,默认的数据类型不能满足我们的需求时,需要我们自定义序列化类,实现WritableComparable。在自定义的序列化类中,最重要的是重写compareTo方法以及序列化反序列化方法。序列化和反序列化的内容需要重点关注,容易犯低级错误。
★二次排序:compareTo方法也可以实现二次排序的功能,但会产生大量的序列化反序列化实例,浪费资源;比较优化的方法是在自定义序列化类中的一个静态内部类--Comparator,继承WritableComparator,在这个类中的compare方法中写排序的逻辑。需要对这个内部类进行注册。
B、Mapper---Mapper对来的每一条数据进行一次计算(这里的计算指的时代码逻辑,这句话的意思就是每来一条数据走一次map方法)
代码及注释--https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/wordcount/WordCountMapper.java
自定义Mapper,需要继承Mapper,并指定泛型<key-in,value-in,key-out,value-out>,然后重写map方法。泛型中的key-in,value-in是读取文件时读的内容,默认k按偏移量操作,v读一行内容,可以通过自定义输入格式来改变k-in和v-in;key-out和value-out是写入环形缓存区的内容,如果有Reducer的话这里的k-out和v-out最终是Reducer的key-in和value-in。
★shuffle过程--shuffle: 洗牌、发牌——(核心机制:数据分区,排序,缓存)
mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle。简单来说,就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序。
◆理解shuffle过程
MapReduce的核心与基础是Mapper类、Reducer类与Driver。Driver中主要是main()方法,MR的程序入口;Driver中还要规定job的各种配置。自己的Mapper需要继承Mapper类,重写其中的map()方法,自己的Reducer需要继承Reducer类,重写其中的reduce()方法。map的运行机制是来一条数据运行一次map方法,reduce的运行机制是来一个key运行一次reduce方法。数据从map中出来到进入reduce之前称为shuffle阶段,Mapper的数量不建议人为设定,一般一个block对应一个Mapper,而Reducer的数量可以在Driver中人为控制,不设定默认是1。
MR过程中的数据流向:一个文件在HDFS中是分布存储在不同节点的block中,每一个block对应一个Mapper,每一条数据以K,V的形式进入一个map()方法,map()方法中对数据进行处理(数据筛选,业务逻辑以及分区),再将处理结果以K,V的形式写入环形缓冲区,一个Mapper对应一个context,context对写入的数据按key进行聚合、排序、归约。context的大小默认为100M,当context容量达到80%或Mapper处理结束时,context会向外溢出,形成许多小文件,小文件为一个K和许多V的集合。处理完成后,这些文件会发送到Reducer所在节点,在该节点的context中,会对不同节点发送过来的数据按key进行再一次的聚合、排序和归约,最后进入Reducer,在reduce方法中对同一个<key,value集合>进行处理(业务逻辑),然后按照分区写入文件。
shuffle的处理任务:将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序;
1、maptask收集我们的map()方法输出的k、v对,放到内存缓冲区中
2、从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3、多个溢出文件会被合并成大的溢出文件
4、在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序
5、reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据
6、reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)
7、合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)
备注:Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认100M
C、Reducer---Reducer对相同的Key进行一次计算
代码及注释--https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/wordcount/WordCountReducer.java
自定义Reducer,需要继承Reducer,并指定泛型<key-in,value-in,key-out,value-out>,然后重写reduce方法。泛型中的k-in和v-in必须和Mapper的k-out和v-out的数据类型一致;可以通过自定义输出格式来改变k-out和v-out。
D、Driver---main方法,程序的入口
代码及注释--https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/wordcount/WordCountDriver.java
在main方法中需要获取一个配置实例,得到一个job实例,用这个job指定主类、Mapper类、Reducer类、Combiner类(如果有)、Partitioner类(如果有),指定Mapper和Reducer的输出格式类(如果Mapper和Reducer的输出类型相同,可以只设置outputKey和outputValue;如果不同则需要设置outputKey和outputValue、MapoutputKey和MapoutputValue),通过默认输入格式或自定义输入格式指定输入文件路径,指定输出目录,判断job结束并关闭程序。
E、Partitioner---分区类
代码及注释参见--https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/FlowSumPartitioner.java
自定义分区类需要继承Partitioner,指定泛型<k,v>,重写getPartition方法,它可以实现将不同的Key写入不同的文件。如果自定义了分区类,那么需要在Driver中指定分区类并且设置ReducTask数量(通过setNumReduceTasks方法)。
注意:每来一条数据走一次getPartition方法;有几个ReduceTask就会生成几个文件;1个task任务不要处理大于10G的内容;Partitioner的泛型要和Mapper的k-out、v-out一致。
F、Combiner---本地的reducer,只能起到过渡和优化的作用,它能做一些像归约类的对输出结果不造成影响的任务,比如求和
代码及注释--https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/flowsum/FlowSumCombiner.java
Combiner是一个本地reducer,所以它仍然继承Reducer,指定泛型<key-in,value-in,key-out,value-out>,它的key-in,value-in,key-out,value-out必须要和mapper的k-out,v-out一致,也和reducer的k-in,v-in一样(因为它只做简单优化,不能影响输出结果)。因为combiner只对reducer进行优化,所以它的逻辑可以跟reducer完全相同,也可以不一样,但是不能影响输出结果。而且,在数据量小的时候使用combiner与否几乎没什么差别。至于使用combiner的原因,是Reducer是在运行在网络环境上的,当数据量太大时,网络I/O速度慢,会导致效率低下。用本地的Reducer过渡,预处理可以提高效率。
G、自定义输入格式--InputFormat
代码及注释参见--https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/MyFileInputFormat.java
自定义输入格式,需要继承FileInputFormat。里面主要是重写createRecordReader方法,返回一个自定义的RecordReader。实际上还有一个重写方法是isSplitable,但是我们一般不作重写;因为Hadoop不适合管理小文件,所以我们需要这个方法的返回值一直时true,而在InputFormat的源码中, 方法是这样的:protected boolean isSplitable(JobContext context, Path filename) {return true;},所以我们不需要重写这个方法。
自定义RecordReader,它是主要的处理输入数据格式的类,最终是写初始化方法initialize和nextKeyValue
在自定义RecordReader中,需要重写的方法有:
①initialize()---初始化方法,完成自定义字段的初始化(以实现一次读取两行为例解释)
思路:因为LineReader可以完成一行一行读的目的,所以初始化时,要做的事情就是得到一个LineReader实例;通过查看API发现,想实例化一个LineReader最低要求是得到一个输入流,所以首先需要得到一个输入流;输入流可以通过文件对象open(Path)方法获得,所以我们的目的变成了获取一个文件对象和一个Path;文件对象可以通过get(Configuration)获取,Path可以通过FileSplit的getPath()获得;Configuration可以通过context得到,FileSplit可以通过split得到---context是一直贯穿于整个过程中的,split时initialize的参数。所以,将上面的思路倒序实现,就完成了初始化过程,得到一个LineReader的实例。
②nextKeyValue()---在这个方法中写逻辑,对K和V进行赋值
要实现一次读两行,而在初始化时得到的LineReader可以一次读一行,所以只需要读两次,然后赋值给Value就可以实现。
③getCurrentKey()---获取当前key
④getCurrentValue()---获取当前value
⑤getProgress()---获取进度,一般return true?0.0f:1.0f;
⑥close()---如果开了流必须要关闭,如果没开流则不需要
H、自定义输出格式--OutputFormat
代码及注释参见--https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/MyOutputFormat.java
自定义输出格式,需要继承FileOutputFormat,主要重写一个getRecordWriter方法,返回一个自定义的RecordWriter。自定义的RecordWriter中重写write方法和close。可以实现自定义输出文件名,也可以写逻辑改变输出内容。