欢迎投稿

今日深度:

Hadoop(二),

Hadoop(二),


----------------------------1701B—裴博润------------------------------
     ---*-----------------------复习-------------------------*---
Linux
Linux是一套免费使用和自由传播的类Unix的操作系统
c语言编写
以网络为核心
特性:
(1)一切皆文件
(2)每个软件都有固定的用途
免费开源   多用户、多任务  良好的界面(字符界面和图面界面)
常用版本:
Red Hat 商业版(收费)
Fedora Core 由原来的Red Hat桌面版发展而来,免费(家用)
Centos:Red Hat社区克隆版本,免费
Debian:经常应用于服务器,性能稳定
Ubuntu:Debian衍生而来,比较流行的桌面系统
Fedora:急于尝试新技术,等不及稳定版本,一个测试平台


Hadoop
三大核心:
HDFS(分布式文件存储系统) Yarn(分布式的资源管理器)  MapReduce(分布式的计算框架)
四大模块:
Common:是Hadoop的基础设施,为其他模块提供基础服务
HDFS:Hadoop的分布式文件存储系统
MapReduce:一个分布式的高容量,大吞吐量的计算框架
Yarn:一个分布式的资源管理框架
特点:
高可靠、高扩展、高效性、高容错
缺点:
不适合大量存储小文件
不适合低延迟数据访问(不适合快速访问)
不支持多用户的写入和修改操作,支持单用户的写入
hdfs:
NameNode:存储元数据(命名空间、生成时间、大小、目录等),校验和
SecondaryNameNode:FSimage(映射文件) 和 edits(事务日志) 进行合并
DataNode:存储数据
hdfs的读写操作:
8、HDFS读文件流程
1)先通过客户端调用FileSyStem对象的.open()方法打开HDFS中需要读取的文件
2)FileSyStem通过远程协议调用NameNode,确定要访问的文件的数据块的位置;NameNode返回一个含有数据块的“元数据”信息(即文件的基本信息);然后,DataNode按照NameNode定义的距离值进行排序,如果客户端本身就是一个DataNode,那么会优先从本地的DataNode节点上进行数据读取;
返回一个InputStream给客户端,让其从FSDataInputStream中读取数据,FSDataInputStream接着包装一个DFSInputStream,用来管理DataNode和NameNode的I/O
3)Namenode向客户端返回一个包含数据块信息的地址,客户端会根据创建一个FSDataInputStream,开始对数据进行读取
4)FSDataInputStream根据开始时候存放的位置,连接到离它最近的DataNode,对其上数据进行从头读取操作。读取过程中客户端会反复调用.read()方法,以I/O(流式方式)从DataNode上访问读取数据
5)当读取到Block的最后一块时,FSDataInputStream会关闭掉当前DataNode的链接,然后查找能够读取的下一个Block所在的距离当前最近的DataNode
6)读取完之后调用.close()方法,关闭FSDataInputStream




9、HDFS写文件流程
1)客户端调用FileSyStem的.create()方法来请求创建文件
2)FileSyStem通过NameNode发送请求,创建一个新文件,但此时并不关联其它任何数据块。NameNode进行很多检查保证不存在要创建的文件已经在与HDFS系统当中,同时检查是否有相应的权限来创建这个文件。如果这些检查都已完成,那么NameNode就会记录下来这个新建的文件的信息。FileSyStem就返回一个FSDataOutputStream给客户端让它来写数据。和度的情况一样,FSDataOutputStream将会包装一个DFSOutputStream用于和DataNode以及NameNode进行通讯的。一旦文件创建失败了呢?客户端会接受到一个IOException,表示文件创建失败,停止后续的所有任务
3)客户端开始写数据。FSDataOutputStream把要写入的数据分成块的形式,将其写入到队列中。其中的数据有DataStreamer读取(DataStreamer的职责:让NameNode分配新的块--通过找到合适的DataNode来存储备份的副本数据)这些DataNode组成一条流水线,假设是一个三级流水线,那么里面含有三个节点。此时DataStreamer把数据首先写入到离它最近的DataNode上(第一个节点);然后由第一个节点将数据块写入到第二个节点上;第二个节点继续把数据块传送到第三个节点上
4)FSDataOutputStream维护了一个内部关于write packet的队列,里面存放了等待DataNode确认
无误的packets信息。这个队列称为等待队列。一个packet的信息被移出本队列的前提是当packet流水线中的所有节点确认无误


5)当完成数据写入操作之后,客户端会调用.close()方法,在通知NameNode它写数据完成之前,这个方法将Flush(刷新)残留的packets,并且等待信息确认,NameNode已经知道文件由哪些数据块,通过DataStream询问数据块的分配,所以它在返回成功之前必须要完成配置文件中配置的最小副本数的复制操作


Yarn:
新的计算框架MRv2,是一个全局的资源管理器,负责整个集群的资源管理和分配,是Master/Slave模式
Master/slave模式:由 Master 服务器负责增、删、改操作,由 Slave 负责读操作,Master 一般只有一台,而 Slave 可以有好多台。Slave 与 Master 之间会有心跳数据包(一般数据库服务器会提供配置)。当 Master 有数据写入时 Master 会将数据同步至各 Slave 上。
yarn组件:
ApplicationMaster(应用管理器)
Container(容器)
ResourceManager(资源管理器)
NodeManager(节点管理器)
yarn的工作流程(背诵):
当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是启动ApplicationMaster;第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。如图2-11所示,YARN的工作流程分为以下几个步骤:
步骤1 用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
步骤2 ResourceManager为该应用程序分配第一个Container,并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。
步骤3 ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。
步骤4 ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
步骤5 一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。
步骤6 NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
步骤7 各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。
在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
步骤8 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。


HA:
高可用  可以增加RM的可用性(出现单点故障不影响RM的运行)
应用: nn HA 两种状态:active  standby


zookeeper:
分布式协调服务
解决分布式环境下的数据管理问题:
统一命名,状态同步,集群管理,配置同步
状态: leader follower 集群一般奇数台 过半选举
应用: 集群配置管理  HA


shell:
既是一种命令语言,又是一种程序设计语言,本身是用C语言编写的程序
shell脚本的两种运行方式:交互式、批处理


awk:
主要用来处理文本数据,处理效率高于shell,代码简洁高效,绝不会遇到内存溢出


sed:
sed是一种在线编辑器,流编辑器,它是文本处理中非常中的工具,能够完美的配合正则表达式使用,一次处理一行内容
sed主要用来自动编辑一个或多个文件;简化对文件的反复操作;编写转换程序等


正则


hadoop调度器:
调度器的作用是将系统中空闲的资源按一定策略分配给作业
1.默认的调度器FIFO 类似于栈 先进先出
2.计算能力调度器Capacity Scheduler
3.公平调度器Fair Scheduler
4.适用于机构集群的调度器LATE
5.适用于实时作业的调度器Deadline Scheduler和Constraint-based Scheduler
详细内容看董的博客:http://dongxicheng.org/mapreduce/hadoop-schedulers/


----------------------------1701B—裴博润------------------------------
     ---*-----------------------day01-------------------------*---
MapReduce
一种分布式计算模型,解决海量数据的计算问题
MapReduce将整个并行计算过程抽象到两个函数
Map(映射):对一些独立元素组成的列表的每一个元素进行指定的操作,可以高度并行。 
 Reduce(化简  归约):对一个列表的元素进行合并。List{}
一个简单的MapReduce程序只需要指定map()、reduce()、input和output,剩下的事由框架完成
特点:
易于编程
良好的扩展性
高容错性
适合PB级以上海量数据的离线处理
Mapreduce执行流程:
对多个map任务的输出,按照不同的分区,通过对网络copy到不同的reduce节点。
对多个mao任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
把reduce的输出保存到文件中


Map Task
Map引擎
解析每条数据记录,传递给用户编写的map()
将map()输出数据写入本地磁盘(如果是map-only作业,则直接写入HDFS)
Reduce Task
Reduce引擎
从Map Task上远程读取输入数据
对数据排序
对数据按照分组传递给用户编写的reduce()


块  片  maptask
数据储存分块  mr计算是task 在map端是maptask
一个分片对应一个maptask
切片: 默认按照块大小切片,即一个分块对应一个切片


hadoop自带辅助类:
GenericOptionsParser
GenericOptionsParser是一个类,用来解释常用的hadoop命令行选项,并根据需求,为Configuration对象设置相应的取值。通常不直接使用GenericOptionsParser,更方便的方式是:实现Tool接口,通过ToolRunner来运行应用程序,ToolRunner内部调用GenericOptionsParser:
public interface Tool extends Configurable{
int run(String [] args)throws Exception;
}


MapReduce执行流程
map任务处理
 
读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
 对输出的key、value进行分区。
 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
 (可选)分组后的数据进行归约。
reduce任务处理


对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
把reduce的输出保存到文件中。




----------------------------1701B—裴博润------------------------------
     ---*-----------------------day02-------------------------*---
做完作业预习shuffle
网址博客:
http://langyu.iteye.com/blog/992916
http://blog.csdn.net/gyflyx/article/details/16831015


----------------------------1701B—裴博润------------------------------
     ---*-----------------------day03-------------------------*---






预习shuffle


shuffle -- MapReduce 的核心过程
Shuffle是指从Map 产生输出开始,包括系统执行排序以及传送Map 输出到Reducer 作为输入的过程


shuffle过程的基本要求:
  1)完整地从map task端拉取数据到reduce task端
  2)在拉取数据的过程中,尽可能地减少网络资源的消耗
  3)尽可能地减少磁盘IO对task执行效率的影响


shuffle满足条件
1)保证拉取数据的完整性
2)尽可能地减少拉取数据的数据量
3)尽可能地使用节点的内存而不是磁盘


它分布在Mapreduce的map阶段和reduce阶段,共可分为6个详细的阶段
1).Collect阶段:将MapTask的结果输出到默认大小为100M的MapOutputBuffer内部环形内存缓冲区,保存
的是key/value,Partition分区


2).Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘
之前需要对数据进行一次排序的操作,先是对partition分区号进行排序,再对key排序,如果配置了
combiner,还会将有相同分区号和key的数据进行排序,如果有压缩设置,则还会对数据进行压缩操作。


3).Combiner阶段:等MapTask任务的数据处理完成之后,会对所有map产生的数据结果进行一次合并操作,
以确保一个MapTask最终只产生一个中间数据文件。


4).Copy阶段:当整个MapReduce作业的MapTask所完成的任务数据占到MapTask总数的5%时,JobTracker就会
调用ReduceTask启动,此时ReduceTask就会默认的启动5个线程到已经完成MapTask的节点上复制一份属于自
己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写
到磁盘之上。


5).Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存中和本地中的数据文件进行
合并操作。


6).Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,
ReduceTask只需做一次归并排序就可以保证Copy的数据的整体有效性。




Mapper 和 Reduce 类都有setup和cleanup方法
setup:初始化,加载字典。分布式缓存
cleanup:最终输出,一次性输出
setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高
cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!




----------------------------1701B—裴博润------------------------------
     ---*-----------------------day04-------------------------*---


shuffle:
重写
1.partition
2.combiner
3.FileInoutFormat
4.FileOutputFormat
5.GroupingComparator


设置
1.reduce 个数(reduce个数必须和分区个数保持一致)
2.压缩 map 和 reduce


代码:
//自定义combiner 默认是Reducer
// job.setCombinerClass(Reducer.class);
//自定义分组比较器
// job.setGroupingComparatorClass(cls);
//自定义输入
// job.setInputFormatClass(cls);
//定义reduce个数
// job.setNumReduceTasks(2);
//自定义分区 默认HashPartitioner
// job.setPartitionerClass(cls);
//自定义输入
// job.setInputFormatClass(cls);
//自定义输出
// job.setOutputFormatClass(cls);


shuffle流程:
1.数据输入,切片 分块,每个块对应一个切片 每个切片对于一个maptask
2.执行map()方法
  分区 partitioner决定map输出数据由哪一个reduce来处理,reduce数量保持一致
  数据提交到环形内存缓冲区 默认100m 阀值80%  达到后进行spill溢写,数据会进行  sort排序,合并,压缩
3.reduce copy过程,远程拉取数据,单独起一个线程拉去数据 放到环形内存缓冲区,达到阀值后也会进行溢写 sort排序 分组 文件合并 压缩 到reduce进行业务处理
4.业务处理之后,最终结果落地到hdfs


----------------------------1701B—裴博润------------------------------
     ---*-----------------------day05-------------------------*---


自定义分区(reduce个数) 继承Partitioner<map输出key的类型, map输出value的类型,>
自定义数据类型 实现WritableComparable<本类类型>
自定义单区多文件 引用MultipleOutputs对象来输出 需关闭




----------------------------1701B—裴博润------------------------------
     ---*-----------------------day05-------------------------*---
map端join实现思想:加载分布式缓存
reduce端join实现思想:在map端打标记


一.MapReduce:处理大量海量数据(清洗)
1.数据处理复杂度
Count  平均值
分类,对比(join)
趋势(统计分析)
预测(人工智能)

2.来源和特点
源自于Google的MapReduce论文,发表于2004年12月,Hadoop mapreduce是Google MapReduce克隆版


特点:
易于编程
良好的扩展性
高容错
适合PB级以上海量数据的离线处理


3.MapReduce 编程模型
一种分布式计算模型,解决海量数据的计算问题
MapReduce将整个并行计算过程抽象到两个函数
1)Map(映射):对一些独立元素组成的列表的每一个元素进行指定的操作,可以高度并行。 
2)Reduce(化简  归约):对一个列表的元素进行合并。List{}


一个简单的MapReduce程序只需要指定map()、reduce()、input和output,剩下的事由框架完成


4.MapReduce的概念
什么是job和Task? (一个job拆分出来有多个task,job包含task)
job:用户提交的每一个计算请求,称为一个作业。(一次计算请求)
Task:每一个作业,都需要拆分开了,交由多个服务器来完成,拆分出来的执行单位,就称为任务。 
Task分为MapTask和ReduceTask两种,分别进行Map操作和Reduce操作,依据Job设置的Map类和Reduce类.


5.Map Task (在Map引擎端运行的,解析每一条记录,用来做映射)
· Map引擎
· 解析每条数据记录,传递给用户编写的map()
· 将map()输出数据写入本地磁盘(如果map-only作业,则直接写入HDFS)


6.Reduce Task (在Reduce端执行的,Reduce Task的输入就是Map Task端的输出,规约不同的Map Task数据)
· Reduce引擎
· 从Map Task上远程读取输入数据
· 对数据排序
· 将数据按照分组传递给用户编写的reduce()


附加:
jobTracker
Jobtracker是主线程,它负责接收客户作业提交,调度任务到工作节点上运行,并提供诸如监控工作节点状态及任务进度等管理功能,一个MapReduce集群有一个jobtracker,一般运行在可靠的硬件上。
tasktracker是通过周期性的心跳来通知jobtracker其当前的健康状态,每一次心跳包含了可用的map和reduce任务数目、占用的数目以及运行中的任务详细信息。Jobtracker利用一个线程池来同时处理心跳 和客户请求。
当一个任务被提交时,组成作业的每一个任务的信息都会存储在内存中,在任务运行的时候,这些任务会伴随着tasktracker的心跳而更新,因此能近乎实时的反映任务进度和健康状况。
tasTtracker
第二个后台程序—tasktracker—由jobtracker指派任务,实例化用户程序,在本地执行任务并周期性地向jobtracker汇报状态。在每一个工作节点上永远只会有一个tasktracker。tasktracker和 DataNode运行在一个机器上,从而使得每一台物理机器既是一个计算节点,同时也是一个存储节点。每一个tasktracker能够配置map和reduce的任务片数(taskslot),这个数字代表每一种任务能被并行执行的数




7.块 , 片 , maptask 关系
数据存储分块(默认128M),MapReduce计算是task,在map端是maptask,一个切片对应一个maptask
切片:默认按照块大小切片,即一个分块对应一个切片


8.MapReduce ----- Map 执行流程
1)读取输入文件内容,解析成key、value对。
2)对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。 
3)写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
4)对输出的key、value进行分区。对不同分区的数据,按照key进行排序、分组。
5)相同key的value放到一个集合中。(可选)分组后的数据进行归约。


9.MapReduce ----- Reduce 执行流程
1)对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2)对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
3)把reduce的输出保存到文件中。


10.WordCount处理过程
1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows/Linux环境不同)。
2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对
3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。
4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果


11.Hadoop辅助类
GenericOptionsParser是一个类,用来解释常用的hadoop命令行选项,并根据需要,为Configuration对象设置相应的取值。通常不直接使用GenericOptionsParser,
另一个方式是:实现Tool接口,通过ToolRunner来运行应用程序,ToolRunner内部调用GenericOptionsParser


12.MapReduce基本数据类型
Writable实现 序列化大小(字节)
IntWritable  1
VintWritable 1~5
ByteWritable 1
BooleanWritable 1
FloatWritable 4
LongWritable 8
VlongWritable 1~9
DoubleWritable 8
Text -- (String)
nullWritable 占位 0字节


13.mapper 和 reducer 类都有setup 和 cleanup方法
setup:初始化。加载字典。分布式缓存
cleanup:最终输出,一次性输出






二.Shuffle 
1.概念: map端的输入到reduce端的输出(本意:洗牌,打乱)
Conllections.shuffle(List):随机的打乱参数list里的元素顺序
shuffle机制 ---- MapReduce 的核心过程 描述着数据从maptask输出到reducetask输入的这段过程,一种数据调度机制


2.shuffle中最重要的功能是分组排序
maptask端先输出到本地缓存(内存缓冲区和磁盘文件)进行分组排序,在reducetask端还要再次进行合并排序


3.shuffle流程(对应PPT):
1)数据输入(input,可以获取文件列表,文件位置,分片的位置),根据块大小进行分片(切片对应一个maptask)找到对应的maptask
2)执行map()方法(每个map有一个环形内存缓冲区,用于存储任务的输出,默认大小100MB,一旦达到阈值80%,会出现溢写(spill)到磁盘,这个过程会发生排序(sort),再合并(combiner合并:数据的合并,把相同key的value进行合并;merge合并:文件的合并,压缩如果设置则会发生)
3)map输出的结果到磁盘partitioner是决定map输出的数据由哪一个reduce来处理,并且分区的个数必须和reduce的数量保持一致
4)到reduce端从磁盘中远程拉取数据(copy过程),单独起一个线程,也会有一个环形内存缓冲区,达到阈值后也会溢写文件到本地,再合并(merge)
5)在reduce()方法里进行业务处理,并输出到hdfs


merge合并有三种:
1)内存到内存
2)内存到磁盘(默认的)
3)磁盘到磁盘


map阶段溢写:1.sort排序(默认key按照字典排序)2.数据合并(combiner)3.文件合并(merge)4.压缩(map端设置压缩就会执行)


reduce阶段溢写:1.sort排序(默认字典排序)2.分组,将相同的key的value放到一个容器的过程 3.merge:文件合并4.压缩:compress


mapreduce基本流程
一个切片对应一个maptask,默认一个reduce,通过job.setNumReduceTasks()来设置reduce的个数


mapreduce分片阶段流程
数据-->分块--->map映射--->shuffle打乱数据(把相同key放在一起)--->reduce归约(把相同key归约到一起)-->结果


输入时map(k:v)k:偏移量,v:一行数据
输出是map(k:v)k:标识符,v:根据需求定义
reduce(k:v)k:标识符,v:list


(网上信息)
Combiner
1、是在每一个map task的本地运行,能收到map输出的每一个key的valuelist,所以可以做局部汇总处理
2、因为在map task的本地进行了局部汇总,就会让map端的输出数据量大幅精简,减小shuffle过程的网络IO
3、combiner其实就是一个reducer组件,跟真实的reducer的区别就在于,combiner运行maptask的本地
4、combiner在使用时需要注意,输入输出KV数据类型要跟map和reduce的相应数据类型匹配
5、要注意业务逻辑不能因为combiner的加入而受影响




它分布在Mapreduce的map阶段和reduce阶段,共可分为6个详细的阶段
1).Collect阶段:将MapTask的结果输出到默认大小为100M的MapOutputBuffer内部环形内存缓冲区,保存
的是key/value,Partition分区


2).Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘
之前需要对数据进行一次排序的操作,先是对partition分区号进行排序,再对key排序,如果配置了
combiner,还会将有相同分区号和key的数据进行排序,如果有压缩设置,则还会对数据进行压缩操作。


3).Combiner阶段:等MapTask任务的数据处理完成之后,会对所有map产生的数据结果进行一次合并操作,
以确保一个MapTask最终只产生一个中间数据文件。


4).Copy阶段:当整个MapReduce作业的MapTask所完成的任务数据占到MapTask总数的5%时,JobTracker就会
调用ReduceTask启动,此时ReduceTask就会默认的启动5个线程到已经完成MapTask的节点上复制一份属于自
己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写
到磁盘之上。


5).Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存中和本地中的数据文件进行
合并操作。


6).Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,
ReduceTask只需做一次归并排序就可以保证Copy的数据的整体有效性。


4.260M 分几个块?
1)128M 2)132M
300M数据分几块?
300/128=2.34375>1.1,接着300-128=172,172/128=1.34375>1.1,接着172-128=44,44/128=0.34375,所以分3块,分别为:128M,128M,44M。
分块:最后一个块 冗余10%,只要不超过10%就放在一个块,块大小是最后文件的大小






5.Shuffle可重写 :
1)partition
2)combiner
3)FileInputFormat
4)FileOutputFormat
5)GroupingComparator(分组比较器)


设置:
自定义分区 默认HashPartitioner
job.setPartitionerClass(HashPartitioner.class);
自定 combiner 默认reducer
job.setCombinerClass(Reducer.class);
自定分组比较器
job.setGroupingComparatorClass(cls);
自定义输出入
job.setInputFormatClass(cls);
自定义输出
job.setOutputFormatClass(cls);
定义reduce个数
job.setNumReduceTasks(2);
设置压缩
job.MAP_OUTPUT_COMPRESS
设置压缩2
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass(name, theClass, xface);


----------------------------1701B—裴博润------------------------------
     ---*-----------------------day06-------------------------*---
----------------------------1701B—裴博润------------------------------
     ---*-----------------------总结-------------------------*---


一.MapReduce:处理大量海量数据(清洗)
1.数据处理复杂度
Count  平均值
分类,对比(join)
趋势(统计分析)
预测(人工智能)

2.来源和特点 -
源自于Google的MapReduce论文,发表于2004年12月,Hadoop mapreduce是Google MapReduce克隆版


特点:
易于编程
良好的扩展性
高容错
适合PB级以上海量数据的离线处理


3.MapReduce 编程模型
一种分布式计算模型,解决海量数据的计算问题
MapReduce将整个并行计算过程抽象到两个函数
1)Map(映射):对一些独立元素组成的列表的每一个元素进行指定的操作,可以高度并行。 
2)Reduce(化简  归约):对一个列表的元素进行合并。List{}


一个简单的MapReduce程序只需要指定map()、reduce()、input和output,剩下的事由框架完成


4.MapReduce的概念
什么是job和Task? (一个job拆分出来有多个task,job包含task)
job:用户提交的每一个计算请求,称为一个作业。(一次计算请求)
Task:每一个作业,都需要拆分开了,交由多个服务器来完成,拆分出来的执行单位,就称为任务。 
Task分为MapTask和ReduceTask两种,分别进行Map操作和Reduce操作,依据Job设置的Map类和Reduce类.


5.Map Task (在Map引擎端运行的,解析每一条记录,用来做映射)
· Map引擎
· 解析每条数据记录,传递给用户编写的map()
· 将map()输出数据写入本地磁盘(如果map-only作业,则直接写入HDFS)


6.Reduce Task (在Reduce端执行的,Reduce Task的输入就是Map Task端的输出,规约不同的Map Task数据)
· Reduce引擎
· 从Map Task上远程读取输入数据
· 对数据排序
· 将数据按照分组传递给用户编写的reduce()


7.块 , 片 , maptask 关系
数据存储分块(默认128M),MapReduce计算是task,在map端是maptask,一个切片对应一个maptask
切片:默认按照块大小切片,即一个分块对应一个切片


8.MapReduce ----- Map 执行流程
1)读取输入文件内容,解析成key、value对。
2)对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。 
3)写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
4)对输出的key、value进行分区。对不同分区的数据,按照key进行排序、分组。
5)相同key的value放到一个集合中。(可选)分组后的数据进行归约。


9.MapReduce ----- Reduce 执行流程
1)对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2)对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
3)把reduce的输出保存到文件中。


10.WordCount处理过程
1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows/Linux环境不同)。
2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对
3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。
4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果


11.Hadoop辅助类
GenericOptionsParser是一个类,用来解释常用的hadoop命令行选项,并根据需要,为Configuration对象设置相应的取值。通常不直接使用GenericOptionsParser,
另一个方式是:实现Tool接口,通过ToolRunner来运行应用程序,ToolRunner内部调用GenericOptionsParser


12.MapReduce基本数据类型
Writable实现 序列化大小(字节)
IntWritable  1
VintWritable 1~5
ByteWritable 1
BooleanWritable 1
FloatWritable 4
LongWritable 8
VlongWritable 1~9
DoubleWritable 8
Text -- (String)
nullWritable 占位 0字节


13.mapper 和 reducer 类都有setup 和 cleanup方法
setup:初始化。加载字典。分布式缓存
cleanup:最终输出,一次性输出






二.Shuffle 
1.概念: map端的输入到reduce端的输出(本意:洗牌,打乱)
Conllections.shuffle(List):随机的打乱参数list里的元素顺序
shuffle机制 ---- MapReduce 的核心过程 描述着数据从maptask输出到reducetask输入的这段过程,一种数据调度机制


2.shuffle中最重要的功能是分组排序
maptask端先输出到本地缓存(内存缓冲区和磁盘文件)进行分组排序,在reducetask端还要再次进行合并排序


3.shuffle流程(对应PPT):
1)数据输入(input,可以获取文件列表,文件位置,分片的位置),根据块大小进行分片(切片对应一个maptask)找到对应的maptask
2)执行map()方法(每个map有一个环形内存缓冲区,用于存储任务的输出,默认大小100MB,一旦达到阈值80%,会出现溢写(spill)到磁盘,这个过程会发生排序(sort),再合并(combiner合并:数据的合并,把相同key的value进行合并;merge合并:文件的合并,压缩如果设置则会发生)
3)map输出的结果到磁盘partitioner是决定map输出的数据由哪一个reduce来处理,并且分区的个数必须和reduce的数量保持一致
4)到reduce端从磁盘中远程拉取数据(copy过程),单独起一个线程,也会有一个环形内存缓冲区,达到阈值后也会溢写文件到本地,再合并(merge)
5)在reduce()方法里进行业务处理,并输出到hdfs


merge合并有三?郑?
1)内存到内存
2)内存到磁盘(默认的)
3)磁盘到磁盘


map阶段溢写:1.sort排序(默认key按照字典排序)2.数据合并(combiner)3.文件合并(merge)4.压缩(map端设置压缩就会执行)


reduce阶段溢写:1.sort排序(默认字典排序)2.分组,将相同的key的value放到一个容器的过程 3.merge:文件合并4.压缩:compress


mapreduce基本流程
一个切片对应一个maptask,默认一个reduce,通过job.setNumReduceTasks()来设置reduce的个数


mapreduce分片阶段流程
数据-->分块--->map映射--->shuffle打乱数据(把相同key放在一起)--->reduce归约(把相同key归约到一起)-->结果


输入时map(k:v)k:偏移量,v:一行数据
输出是map(k:v)k:标识符,v:根据需求定义
reduce(k:v)k:标识符,v:list


(网上信息)
Combiner
1、是在每一个map task的本地运行,能收到map输出的每一个key的valuelist,所以可以做局部汇总处理
2、因为在map task的本地进行了局部汇总,就会让map端的输出数据量大幅精简,减小shuffle过程的网络IO
3、combiner其实就是一个reducer组件,跟真实的reducer的区别就在于,combiner运行maptask的本地
4、combiner在使用时需要注意,输入输出KV数据类型要跟map和reduce的相应数据类型匹配
5、要注意业务逻辑不能因为combiner的加入而受影响




它分布在Mapreduce的map阶段和reduce阶段,共可分为6个详细的阶段
1).Collect阶段:将MapTask的结果输出到默认大小为100M的MapOutputBuffer内部环形内存缓冲区,保存
的是key/value,Partition分区


2).Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘
之前需要对数据进行一次排序的操作,先是对partition分区号进行排序,再对key排序,如果配置了
combiner,还会将有相同分区号和key的数据进行排序,如果有压缩设置,则还会对数据进行压缩操作。


3).Combiner阶段:等MapTask任务的数据处理完成之后,会对所有map产生的数据结果进行一次合并操作,
以确保一个MapTask最终只产生一个中间数据文件。


4).Copy阶段:当整个MapReduce作业的MapTask所完成的任务数据占到MapTask总数的5%时,JobTracker就会
调用ReduceTask启动,此时ReduceTask就会默认的启动5个线程到已经完成MapTask的节点上复制一份属于自
己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写
到磁盘之上。


5).Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存中和本地中的数据文件进行
合并操作。


6).Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,
ReduceTask只需做一次归并排序就可以保证Copy的数据的整体有效性。


4.260M 分几个块?
1)128M 2)132M
300M数据分几块?
300/128=2.34375>1.1,接着300-128=172,172/128=1.34375>1.1,接着172-128=44,44/128=0.34375,所以分3块,分别为:128M,128M,44M。
分块:最后一个块 冗余10%,只要不超过10%就放在一个块,块大小是最后文件的大小






5.Shuffle可重写 :
1)partition
2)combiner
3)FileInputFormat
4)FileOutputFormat
5)GroupingComparator(分组比较器)


设置:
自定义分区 默认HashPartitioner
job.setPartitionerClass(HashPartitioner.class);
自定 combiner 默认reducer
job.setCombinerClass(Reducer.class);
自定分组比较器
job.setGroupingComparatorClass(cls);
自定义输出入
job.setInputFormatClass(cls);
自定义输出
job.setOutputFormatClass(cls);
定义reduce个数
job.setNumReduceTasks(2);
设置压缩
job.MAP_OUTPUT_COMPRESS
设置压缩2
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass(name, theClass, xface);


5.分布式缓存 
(DistributedCache--hadoop 1.0  过时)
Path[] paths = context.getLocalCacheFiles();
setup() 预处理 ----加载 无意义的单词到容器
map() 读取数据,处理(一行数据通过切分,判断过滤单词是否无意义 true--输出)
使用StringTokenizer类默认按照  " \t\n\r\f" 进行切分 
一行数据通过切分,判断过滤单词是否无意义 true--输出
job对象 添加缓存文件--hdfs文件存储系统上---job.addCacheFile(new URI("hdfs://m1:8020/in/dic.txt"));


6.join分为:map join  和  reduce join


MapJoin实现思想:
分布式缓存加载小文件(加载字典文件)到内存,在map端提交业务处理


ReduceJoin实现思想
实现思想:把相同字段key做标志,在reduce端实现具体的业务


map端:
key:value
word 1-word
word 2-hello


reduce端:
key :value
word  list(1-word,2-hello)


CSDN博客:http://blog.csdn.net/xyilu/article/details/8996204


7.字符串str反转
str.length
方法一:
for(i =str.length-1,i<=0,i-- ){
chatAt(i)
}
方法二:
for(i=0,str.length-1,i++){
str.subString(str.length-i-2,str.length-i-1);
}


8.自定义数据类型
实现WritableComparable接口
重写--hashCode() equals() toString() 生成get()set()
重写接口方法
readFields()
write()
compareTo()


9.自定义分区:
默认分区:HashPartitioner
自定义
1)需要继承 Partitioner
2)重写getPartitioner()
3)区的个数 和 reducetask个数保持一致


10.二次排序
首先第一次对第一列排序,如果第一列相同则比较第二列,所有的列都是有序的 (三次排序,以此类推)


在自定义类型的类中:Writable
write()是把每个对象 序列化 到输出流
readFields()是把输入流字节 反序列化


WritableComparable
java值对象的比较:重写toString(),hashCode(),equals()方法


设置排序规则类:
job.setSortComparatorClass(cls)
设置分组规则类:
job.setGroupingComparatorClass(cls)


类Comparable<T>中public int comparaTo(To)方法比较大小
SortComparator,GroupingComparator实现RawComparator类


二次排序使用场景
"数据排序"是许多实际任务执行时要完成的第一项工作,比如学生成绩评比,数据建立索引等
这个实例和数据去重类似,都是先对原数据进行初步处理,为进一步的数据操作打好基础


分组排序: 第一列升序,第二列降序 ---把第一列相同key放到一个分组中,再对第二列排序
实现 RawComparator
重写compara方法


自定义combiner 
//需继承reducer类 
//输入是map端的输出,输出是reduce的输入


11.序列化 , 反序列化
序列化:将数据或对象转化成二进制流,便于网络间的传输
反序列化:还原二进制流的过程


12.数据倾斜
在hadoop程序运行过程中,其他task都执行完毕,某一个task执行时间比较长,因为其中的key过多,拖累了整个程序,造成整个程序运行时间比较长,一般都是某一个key数据量表较大,是其他key的百倍或千倍


解决办法:
加随机前缀,打散key
颗粒设置task个数
调优提高
(网上方法)
1. 增加reduce 的jvm内存
2. 增加reduce 个数
3. customer partition
4. 其他优化的讨论.
5. reduce sort merge排序算法的讨论
6. 正在实现中的hive skewed join.
7. pipeline
8. distinct
9. index 尤其是bitmap index


13.倒排索引
倒排索引是文档检索系统中最常用的数据结构,被广泛的应用于全文搜索引擎。
它主要用来存储某个单词(或词组),在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式,
由于不是根据文档来确定文档所包含的内容,而是进行了相反的操作,因而被称为倒排索引。


14.数据清洗
数据格式:
/** id 手机号 MAC IP URL TYPE 状态1/状态2/上行流量/下行流量/状态3/时间
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 http://news.ifeng.com/a/20161025/50148672_0.shtml 视频网站 15 12 1527 2106 200 1196413077278

数据包含 脏数据和灰色数据
说明:
脏数据:少手机号,或者手机号不完整
灰色数据:缺少上行流量 或者 下行流量  或者  状态3不为200 或者缺少其他字段等


状态3 :
200:正常  400:没有成功  500:内部错误


15.MapReduce压缩
输入的文件的压缩--压缩包
如果输入的文件是压缩过的,那么在被mapreduce读取时,他们会被自动解压,根据文件扩展名来决定应该使用哪一个压缩解码器


MapReduce作业的输出的压缩
map输出结果的压缩
Configuration  conf = new Configuration();
conf.setBoolean("mapred.compress.map.output",true);
conf.setClass("mapred.map.output.compression.codec",GzipCodec.class.CompressionCodec.class);


Reduce 输出压缩吧到hdfs
打开reduce输出压缩设置
FileOutputFormat.setCompressOutput(job,true);
设置使用压缩算法
FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);


配置文件中设置属性:
mapreduce.output.compress:true
mapreduce.output.compression.codec:压缩编码/编码器的类型名
设置压缩类型:
mapreduce.output.compression.type:默认RECORD,它压缩单独的记录;将他改为BLOCK,则可以压缩一组记录 推荐使用


使用压缩的方式:
DEFLATE N/A DEFLATE .deflate No
压缩格式 split native 压缩率 速度 是否hadoop自带 Linux命令 换成压缩格式后,原来的应用程序是否要修改


Gzip 很高 比较快 是,直接使用 和文本处理一样,不需要修改
lzo 比较高 很快 否,需要安装 需要间索引,还需要指定输入格式
Snappy 比较高 很快 否,需要安装 没有 和文本处理一样,不需要修改
bzip2 最高 是,直接使用 和文本处理一样,不需要修改


16.Job相互依赖
ControlledJob及JobControl,提供mapreduce任务流的串接支持设置第一个job
第二个任务的输入目录设置为第一个任务的输出目录(必须显示设置)


设置第一个job:
Job job1 = Job.getInstance(conf, "Demo03JobChain");
job1.setMapperClass(CountMapper.class);
job1.setReducerClass(CountReducer.class);
FileInputFormat.addInputPath(job1, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job1, new Path(arg0[1]));
设置第二个job:
Job job2 = Job.getInstance(conf, "Demo03JobChain");
job2.setMapperClass(TopnMapper.class);
job2.setReducerClass(TopReducer.class);
FileInputFormat.addInputPath(job2, new Path(arg0[1]));
FileOutputFormat.setOutputPath(job2, new Path(arg0[2]));
用ControlledJob对象封装两个任务
ControlledJob cl = new ControlledJob(job1.getConfiguration());
ControlledJob cl1 = new ControlledJob(job2.getConfiguration());
添加依赖 第二个依赖第一个
cl1.addDependingJob(cl);
用JobControl封装任务组
jobc.addJob(cl);
jobc.addJob(cl1);
执行任务--线程Thread
Thread th = new Thread(jobc);
th.start();

while(!jobc.allFinished()){
Thread.sleep(1000);
}
jobc.stop();


被依赖的任务执行完后,后续任务才开始执行,
任何一个先执行的被依赖任务失败,都会导致后续任务失败


mapreduce的压缩方式主要有以下几种:
DEFLATE、gzip、bzip2、LZ0、LZ4、Snappy

www.htsjk.Com true http://www.htsjk.com/Hadoop/41427.html NewsArticle Hadoop(二), ----------------------------1701B—裴博润------------------------------      ---*-----------------------复习-------------------------*--- Linux Linux是一套免费使用和自由传播的类Unix的操作系统 c语...
相关文章
    暂无相关文章
评论暂时关闭