Hadoop之MapReduce,
Hadoop MapReduce
MapReduce是一种编程模型,用于大规模数据集的并行运算。概念"Map(映射)“和"Reduce(归约)”,是它们的主要思想,都是从函数式编程语言(适合在网络中传递方法)里借来的,还有从矢量编程语言里借来的特性。
Hadoop中MapReduce计算框架充分的利用了存储节点所在物理主机的内存
、CPU
、网络
、少许磁盘
完成对大数据集的分布式计算。框架一般会在所有的DataNode所在的物理主机上启动NodeManager服务,NodeManager服务用于管理该服务运行的物理节点的计算资源。除此之外系统一般会启动一个ResourceManager用于统筹整个计算过程中的资源调度问题。
MapReduce计算核心思想是将一个大的计算任务,拆分成若干个小任务,每个小任务独立运行,并且得到计算结果,一般是将计算结果存储在本地。当第一批次任务执行结束,系统会启动第二批次任务,第二批次的任务作用是将第一批次的计算临时结果通过网路下载汇集到本地,然后在本地执行最终的汇总计算。
可以理解为当使用MapReduce执行大数据统计分析时,系统会将分析数据进行切分,我们将切分信息 称为任务切片(实质是对分析目标数据的一种逻辑区间映射)。任务在执行的时候会更具任务切片的数目决定Map阶段计算的并行度。也就意味着Map阶段完成的是数据的局部计算。一个Map任务就代表着一个计算资源。当所有的Map任务都完成了对应区间的数据的局部计算后,Map任务会将计算结果存储在本地磁盘上。紧接着系统会按照系统预设的汇总并行度启动多个Reduce任务对Map阶段计算结果进行汇总,并且将结果内容输出到HDFS、MySQL、NoSQL中。
Map Reduce 2 架构图
ResourceManager
:统筹管理计算资源
NodeManager
:启动计算资源(Container)例如:MRAppMaster、YarnChild同时NM连接RM汇报自身一些资源占用信息。
MRAppMaster
:应用的Master负责任务计算过程中的任务监控、故障转移,每个Job只有一个。
YARNChild
:表示MapTask
、ReduceTask
的总称,表示一个计算进程。
Yarn架构图
构建MR运行环境
- etc/hadoop/mapred-site.xml
[root@CentOS ~]# cp /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml.template /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
- etc/hadoop/yarn-site.xml
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>CentOS</value>
</property>
- 启动YARN
[root@CentOS ~]# start-yarn.sh
[root@CentOS ~]# jps
5250 Jps
4962 ResourceManager
5043 NodeManager
3075 DataNode
3219 SecondaryNameNode
2959 NameNode
MR 第一个HelloWorld
INFO 192.168.0.1 wx com.baizhi.service.IUserService#login
INFO 192.168.0.4 wx com.baizhi.service.IUserService#login
INFO 192.168.0.2 wx com.baizhi.service.IUserService#login
INFO 192.168.0.3 wx com.baizhi.service.IUserService#login
INFO 192.168.0.1 wx com.baizhi.service.IUserService#login
SQL
create table t_access(
level varchar(32),
ip varchar(64),
app varchar(64),
server varchar(128)
)
select ip,sum(1) from t_access group by ip;
reduce(ip,[1,1,1,..]) map(ip,1)
- Mapper逻辑
public class AccessMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
/**
*INFO 192.168.0.1 wx com.baizhi.service.IUserService#login
* @param key : 行字节偏移量
* @param value:一行文本数据
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens=value.toString().split(" ");
String ip=tokens[1];
context.write(new Text(ip),new IntWritable(1));
}
}
- Reducer逻辑
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class AccessReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int total=0;
for (IntWritable value : values) {
total+=value.get();
}
context.write(key,new IntWritable(total));
}
}
- 任务提交
public class CustomJobSubmitter extends Configured implements Tool {
public int run(String[] args) throws Exception {
//1.封装job
Configuration conf=getConf();
Job job=Job.getInstance(conf);
//2.设置分析|输出数据格式类型
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//3.设置数据读入和写出路径
Path src = new Path("/demo/access");
TextInputFormat.addInputPath(job,src);
Path dst = new Path("/demo/res");//必须为null
TextOutputFormat.setOutputPath(job,dst);
//4.设置Mapper和Reducer逻辑
job.setMapperClass(AccessMapper.class);
job.setReducerClass(AccessReducer.class);
//5.设置Mapper和Reducer的输出k/v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.提交任务
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new CustomJobSubmitter(),args);
}
}
-
任务提交
- jar任务发布
job.setJarByClass(CustomJobSubmitter.class);
[root@CentOS ~]# hadoop jar mr01-1.0-SNAPSHOT.jar com.baizhi.mr.CustomJobSubmitter
- 本地仿真
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:557)
覆盖NativeIO文件,将557行代码修改如下
public static boolean access(String path, AccessRight desiredAccess) throws IOException { return true; }
-
跨平台发布
- 将core|hdfs|yarn|mapred-site.xml拷贝项目的resources目录
- 在mapred-site.xml中开启跨平台提交
<property> <name>mapreduce.app-submission.cross-platform</name> <value>true</value> </property>
- 在任务提交代码中添加以下代码片段
conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); conf.addResource("mapred-site.xml"); conf.addResource("yarn-site.xml"); conf.set("mapreduce.job.jar","file:///xxxx.jar");
本地提交时,由于需要拷贝计算资源到HDFS所有可能会报以下异常:
xception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=HIAPAD, access=EXECUTE, inode="/tmp":root:supergroup:drwx------ at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:208) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:171)
以上异常解决方案,参考上一个章节
关闭hdfs权限
!
上一篇:大数据Hadoop之HDFS
下一篇:Map Reduce Shuffle(洗牌)