欢迎投稿

今日深度:

Hadoop,

Hadoop,


 

Apache Lucene开源的高性能全文检索工具包

Apache Nutch开源的Web搜索引擎

Google三大论文(GFS/MapReduce/BigTable)

Apache Hadoop大规模数据批处理

Hadoop Common为其他Hadoop模块提供基础设施(包含网络通信rpc框架)

Hadoop HDFS一个高可靠高吞吐量的分布式文件系统

Hadoop MapReduce分布式计算框架

Hadoop Yarn任务管理与资源调度(Hadoop2.x才出现)

文件系统:磁盘上组织文件的方法

单机文件系统:

windows:FAT16、FAT32、NTFS

linux:ext2/3/4、VFS

分布式文件系统:hdfs

NameNode:主节点,存储文件的元数据如文件名,文件目录,文件属性(生产时间、副本数、文件权限),以及每个文件的块列表和块所在DataNode等

DataNode:存储块数据、块数据的校验和checksum、编码

SecondaryNameNode:监控hdfs状态的辅助后台程序,每隔一段时间获取hdfs元数据的快照。分担namenode压力,合并编辑日志editlog和镜像文件fsimage(因为合并操作需要占用很大资源,影响客户端你请求),合并后将最终的镜像文件fsimage返回给namenode进行处理

数据存储在hdfs-site.xml的dfs.namenode.name.dir属性配置中

数据存储在hdfs-site.xml的dfs.datanode.data.dir属性配置中

存储内容:数据本身和数据长度、校验和、时间戳

 

DataNode启动后会向NameNode注册,注册后会周期性(1小时)向NameNode发送块报告BlockReport(Block块与DataNode节点的映射关系(第二映射关系))

DataNode周期性发送心跳(3秒)给NameNode,心跳返回结果带有NameNode下发给DataNode的命令,如果超过10分钟NameNode没有收到DataNode的心跳,则认为该节点不可用

备份那些组成文件系统元数据持久状态的文件

运行一个secondarynamenode

该模式下,Client只能进行查看,不能进行写入和删除操作

HDFS集群启动后会先进入安全模式,检查数据块和DataNode的完整性。检查具体内容如下:

配置项:dfs.namenode.replication.min 最小block副本数 默认是1

      dfs.namenode.safemode.threshold-pct 百分比 默认是0.999f

配置项:dfs.namenode.safemode.min.datanodes 最小可用dn默认是0

配置项:dfs.namenode.safemode.extension 默认是1ms

 

命令操作:bin/hdfs dfsadmin -safemode <command>

get查看当前状态

enter进入安全模式

leave强制离开安全模式

wait阻塞,直到退出才继续执行

默认3个,通过dfs.replication属性配置

第一个block副本放在和client所在的node里面(如果client不在集群范围内,则这第一个node时随机选取的,当然系统会尝试不选择哪些不满或太忙的node)

第二个副本放置在与第一个节点不同的机架中的node中(随机选择)

第三个副本和第二个在同一个机架,随机放在不同的node中,如果还有更多的副本就随机放在集群的node里

ResourceManager(RM):处理客户端请求、启动/监控ApplicationMaster、监控NodeManager、资源分配与调度

NodeManager(NM):单个节点上的资源管理、处理来自RM的命令、处理来自AM的命令

ApplicationMaster(AM):程序切分、为应用程序申请资源并分配任务、任务监控与容错

Container:对任务运行环境的抽象,封装了CPU、内存等多维资源以及环境变量和启动命令等任务运行相关的信息

ResourceManager(RM)包含ApplicationManager和ResourceSchedular:处理客户端请求,启动/监控ApplicationMaster,监控NodeManager,资源分配调度

NodeManager(NM):单个节点上的ziyuanguanl,chuli来自ResourceManager和ApplicationMaster的命令,

ApplicationMaster:程序切分,为应用程序申请资源,任务监控与容错

Container:对任务运行环境的抽象,封装了cpu、内存等多维资源以及环境变量、启动命令等任务运行相关的信息

 

Hadoop:hdfs+mapreduce+yarn

Lucene:索引检索工具包

Solr:索引服务器

Nutch:开源的搜索引擎

HBase/Cassandra:基于google的BigTable开源的列式存储的菲关系向数据库

Hive:基于SQL的分布式计算引擎,也是一个数据仓库

Pig:基于PIg Latin脚本的计算框架

Thrift/Avro:rpc框架,用于网络通信

BigTop:项目测试、打包、部署

Oozie/Azakban:大数据的工作流框架

Chukwa/Scribe/FLume:数据收集框架

Whirr:部署为云服务的类库

Sqoop:数据迁移工具

Zookeeper:分布式协调服务框架

HAMA:图计算框架

Mahout:机器学习框架

单机:在一台单机上运行,没有分布式文件系统,而是直接读写本地操作系统的文件系统

伪分布式:一台服务器上运行多个进程

分布式

Apache版本

CDH版本(cloudera公司)

HDP版本(HortOnWorks公司)

作用:同步集群上服务器的时间,保持时间一致

例:机器1当成时间服务器,机器2和机器3同步机器1上的时间

1.修改步骤机器1配置ntp时间服务器

vi /etc/sysconfig/ntpd

SYNC_HWCLOCK=yes

vi /etc/ntp.conf

注释以下三行

server 0.centos.pool.ntp.org

server 1.centos.pool.ntp.org

server 2.centos.pool.ntp.org

取消注释以下一行

#restrict 192.168.160.0 mask 255.255.255.0 nomodify notrap

取消注释以下两行

#server 127.127.1.0 #local clock

#fudge 127.127.1.0 stratum 10

2.机器1重启ntp服务

service ntpd restart

date -s yyyy-MM-dd

date -s HH:mm:ss

4.机器2和机器3同步机器1上的时间

手动同步

/usr/sbin/ntpdate 机器1的hostname

定时同步

crontab -e

0-59/10 * * * * /usr/sbin/ntpdate 机器1的hostname

分 时 天 月 周

注:NM启动后去RM上进行注册,会不断发送心跳,说明处于存活状态

资源调度ResourceScheduler

FIFO Scheduler按作业提交的顺序排成一个先进先出队列

Capacity Scheduler预先划分为多个队列,每个队列按FIFO(默认)或DRF方式分配资源

Fair Scheduler动态划分也可预先划分队列,每个队列按Fair(默认)或FIFO或DRF方式分配资源

注:DRF算法(主资源公平算法)

面向大数据并行处理的分布式计算框架,采用分而治之的思想,即任务的分解和结果的汇总,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。

map:并行处理输入数据

shuffle:连接map和reduce阶段,maptask将数据写到本地磁盘,reducetask从每个maptask上读取一份数据

reduce:对map结果进行汇总

JobTracker:负责调度和管理TaskTracker和监控任务的运行状况,运行在master节点上,将mappers和reducers分配给空闲的tasktracker,由tasktracker负责任务的并行执行

TaskTracker:运行在Datanode上

仅适合离线批处理,具有很好的容错性和扩展性,适合简单的批处理任务

系统开销大,过多使用磁盘导致效率低下

适合大文件存储,流式数据访问,不适合大量小文件、随机写入、低延迟读取

高吞吐:为大量数据访问提供高吞吐量支持

高容错:硬件故障是常态

大文件存储:支持存储TP-PB级别的数据

横向扩展:添加机器

数据一致性:一次写入多次读取

多硬件平台:HDFS易于运行在不同的平台上

移动计算能力:计算和存储采用就近原则(有效减少网络负载,降低网络拥塞)

bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.9.0.jar wordcount file:/opt/module/hadoop-2.6.0/LICENSE.txt file:/opt/output

概念:分布式计算作业放到NodeManager运行,其生成的日志信息放在各个NodeManager的本地目录中:

yarn.nodemanager.log-dirs=${yarn.log.dir}/userlogs

通过配置将本地日志放到hdfs服务器上,就是聚合日志

配置:yarn-site.xml

yarn.log-aggregation-enable

#是否启用日志聚合功能,日志聚合开启后保存到hdfs上

yarn.log-aggregation.retain-seconds

#聚合后的日志在hdfs上保存多长时间,单位s

yarn.log-aggregation.retain-check-interval-seconds

#删除任务在hdfs上执行的间隔,执行时候将满足条件的日志删除(超过 #保存时间的日志),如果是0或者负数,则为参数2设置值的1/10

yarn.nodemanager.log.retain-seconds

#当不启用日志聚合时此参数生效,日志文件保存在本地的时间,单位s

yarn.nodemanager.remote-app-log.dir

#当应用程序运行结束后,日志被转移到的hdfs目录(启用日志聚集功能 #时有效),修改为保存的日志文件夹

yarn.nodemanager.remove-app-log-dir-suffix

#远程日志目录子目录名称(启用日志聚合功能时有效)

作用 可以查看在yarn上执行的历史作业

启动 sbin/mr-jobhistory-daemon.sh start historyserver

WEB UI http://hostname:19888

停止 sbin/mr-jobhistory-daemon.sh stop historyserver

冷备份:b是a的冷备份,如果a坏掉,那么b不能马上代替a工作,但是b上存储a的一些信息,减少a坏掉之后的损失

热备份:b是啊的热备份,如果a坏掉,那么b马上运行代替a的工作

单机运行:在Windows上运行,跟集群无关

java.io.IOException:Could not locate executable null\bin\winutils.exe in the Hadoop binaries

配置HADOOP_HOME环境变量

下载winutils.exe和hadoop.dll文件,将他们放到${HADOOP_HOME}/bin目录下

//System.setProperty(hadoop.home.dir,e:/hadoop/);

 

java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

修改NativeIO类,将access0调用处直接换成true

 

远程调用运行:windows系统的代码直接连接linux上的hadoop集群环境进行运行,运行结果可以被保存到hdfs集群或windows磁盘上

Configuration conf = new Configuration();

conf.set(fs.defaultFS,hdfs://ip:port);

两种打jar包方式

Export->Runnable JAR file->选择第三个可以将class文件和lib分离

Run As->Maven build->输入clean package

执行命令bin/yarn jar xxx.jar 全类名 输入参数1 ...

输入分片其实是对一个文件进行逻辑上的分片,数据还是按照block的方式存储在hdfs中,而一个hdfs分片则是记录了该分片从文件的哪个位置开始,长度是多少,这些数据的位置在哪里。在读取分片数据的时候,是根据FileSplit类中的信息去读取相应的block数据,这也是为什么分片最好和block块大小相同的原因,如果一个分片的大小大于一个block的大小,则该分片可能会从其他节点的block读取数据,造成不必要的网络传输,导致处理时间增长(概念:输入分片(Input Split):在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组)

mapTask的数量=split的数量(不一定,mapTask数量可以通过参数设置,该情况极少)

如何计算split

map阶段对数据文件的切片,使用如下逻辑

long minSize = max{minSplitSize, mapred.min.split.size} 

long maxSize = mapred.max.split.size

long splitSize = computeSplitSize(blockSize, minSize, maxSize);

 

protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize));

}

其中,blockSize:Hadoop 2.x默认的block大小是128MB,Hadoop 1.x默认的block大小是64MB,可以在hdfs-site.xml中设置dfs.block.size,注意单位是byte

minSize默认是1b(mapred.min.split.size)- mapred-site.xml

maxSize默认值是Long.MaxValue(mapred.max.split.size)- mapred-site.xml

所以默认情况下split大小就是hdfs的blockSize,map数量就是split的数量

 

https://blog.csdn.net/dr_guo/article/details/51150278

附 :hadooop提供了一个设置mapred.map.tasks来控制map的个数。但是通过这种方式设置map的个数,并不是每次都有效的。原因是mapred.map.tasks只是一个hadoop的参考数值,最终map的个数,还取决于其他的因素。 为了方便介绍,先来看几个名词:

block_size : hdfs的文件块大小默认为128M,可以通过参数dfs.block.size设置

total_size : 输入文件整体的大小

input_file_num : 输入文件的个数

(1)默认map个数

  如果不进行任何设置,默认的map个数是和blcok_size相关的。

   default_num = total_size / block_size;(原因:默认map个数等于split个数(除非手动设置map个数才有可能),而split大小默认等于块大小)

(2)期望大小

  可以通过参数mapred.map.tasks来设置程序员期望的map个数,但是这个个数只有在大于default_num的时候,才会生效。

   goal_num=mapred.map.tasks;

(3)设置处理的文件大小

  可以通过mapred.min.split.size 设置每个task处理的文件大小,但是这个大小只有在大于block_size的时候才会生效。

    split_size=max(mapred.min.split.size,block_size);

    split_num=total_size/split_size;

(4)计算的map个数

compute_map_num=min(split_num,max(default_num, goal_num))

 

除了这些配置以外,mapreduce还要遵循一些原则。 mapreduce的每一个map处理的数据是不能跨越文件的,也就是说min_map_num >= input_file_num。所以,最终的map个数应该为:

     final_map_num=max(compute_map_num, input_file_num)

 

经过以上的分析,在设置map个数的时候,可以简单的总结为以下几点:

(1)如果想增加map个数,则设置mapred.map.tasks 为一个较大的值。

(2)如果想减小map个数,则设置mapred.min.split.size 为一个较大的值。

(3)如果输入中有很多小文件,依然想减少map个数,则需要将小文件merge为大文件,然后使用准则2

Shuffle的本义是洗牌,混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。

从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill写过程,在Reduce端包括copy和sort过程。

Spill过程包括输出、排序、溢写、合并等步骤。

程序会根据InputFormat将输入文件分割成splits,默认情况下每一个split都会对应一个mapTask,每个mapTask都会有一个kvbuffer用来临时存储map的输出结果,当写入kvbuffer的数据达到kvbuffer的阈值(默认是0.8,参数mapreduce.map.sort.spill.percent),就会启动后台线程将内存缓冲区中的数据以临时文件的方式写到磁盘上(这个过程就是溢写spill),同时剩下的0.2也继续写入maptask的输出的中间结果,互不干涉。当整个maptask完成之前,需要将每个maptask产生的spill文件各自合并成一个大文件,mapreduce.task.io.sort.factor(default:10),代表合并的时候最多同时合并多少个spill。

把kvbuffer中的数据按照partition值和key两个关键字升序排序移动的只是索引数据,排序结果时kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key排序。

将每一个map输出阶段溢写生成的多个spill文件各自合并成一个大文件,mapreduce.task.io.sort.factor(default:10),代表合并的时候最多同时合并多少个spill,这个过程是merge。

每个mapTask都会有一个内存缓冲区kvbuffer,就是一个字节数组。其作用就是批量收集map结果,减少磁盘IO的影响,通过参数mapreduce.task.io.sort.mb配置,默认是100M。

在kvbuffer中有map输出的数据和kvmeta索引。数据区和索引数据区域在缓冲区中时相邻不重叠的两个区域,用一个分界点来划分。分界点每次spill之后都会更新一次,出事的分界点时0,数据的存储方向是向上增长,索引数据的存储方向是向下增长。kvmeta索引占缓冲区大小的比例通过参数mapreduce.task.io.sort.record.percent配置,默认是0.05。

注:kvmeta索引是个四元组,包括:value起始位置、key起始位置、partition值、value长度

Reduce任务通过HTTP向各个Map任务拖取它所需要的数据,如果内存可以放下就直接放到内存中,每个map数据对应一块空间。当内存空间达到一定程序就启动内存merge,将数据输出到一个磁盘文件中。如果内存放不下就把map数据直接写到磁盘上,一个map数据就建一个文件,当文件数达到一定阈值,就启动磁盘merge合并到一个文件。最终对内存和磁盘上的数据进行全局合并。这里使用的merge和map端使用的merge过程一样。map的输出数据已经是有序的。merge进行一次合并排序,所谓reduce端的sort过程就是这个合并的过程。一般reduce是以便copy以便sort,即copy和sort两个阶段是重叠而不是完全分开的。

适用于大表join小表,使用DistrubutedCache机制将小表缓存到各个mapper进程所在机器的磁盘上,各个mapper进程读取不同的大表分片,将分片中的每一条记录与小表中所有记录进行合并,合并后直接输出mapper结果就可以得到最后重结果,不需要进行shuffle和reduce

 

fsimage:元数据镜像文件(文件系统目录树)

edits:元数据的操作日志(针对文件系统做的修改操作记录)

 

hdfs集群有主从节点:namenode管理节点、datanode工作节点

namenode:管理hdfs分布式文件系统的命名空间,维护文件系统树及整棵树内所有的文件和目录,这些信息以两个文件形式永久保存在本地磁盘上:命名空间镜像文件imagefile和编辑日志文件editlog。namenode也记录着每个文件中各个block所在的数据节点信息

 

由于namenode将hdfs的metadata存储在内存中,因此hdfs所能存储的文件总数受限于namenode的内存容量。一般每个文件、目录和数据块的存储信息大约占150字节

文档检索系统总最常用的数据结构,被广泛应用于全文搜索引擎,它主要存储某个单词或词组在一个文档或一组文档中的存储位置的映射,即提供一种根据内容来查找文档的方式。由于不是根据文档来确定文档所包含的内容,而是进行了相反的操作,故被称为倒排索引。

 

job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(Mapper.class);

job.setMapOutputKeyClass(LongWriable.class);

job.setMapOutputValueClass(Text.class);

job.setPartitionerClass(HashPartitioner.class);

job.setReducerClass(Reducer.class);

job.setOutputKeyClass(LongWriable.class);

job.setOutputValueClass(Text.class);

job.setOutputFormatClass(TextOutputFormat.class);

 

Reducers数量设置:

job.setNumReducerTasks();默认reducer数量为1

slots类似一个资源池,每一个map或task任务执行必须或得一个slot才能继续,否则只能等待,当一个任务完成就归还slot,这个过程类似于释放资源到资源池中。另外,mapreduce的任务是有tasktracker节点负责执行的,所以slots可进一步理解为tasktrackers能够并发执行多少个任务。slots分为mapper slots和reducer slots分别对应最大可执行的mapper数和reducer数。用户可以修改mapred-site.xml配置文件中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum来设置slots的值,默认为2。集群中可用的reducer slots总数等于集群中总结点数乘以每一个节点有多少个slots,后者由mapred.tasktracker.reduce.tasks.maximum的值决定。Reducer数目的最佳值和reducer slots的总数有关,通常情况下,让Reducer数目略小于reducer slots的总数,其目的为:所有的reducers可以并行执行,减少排队时间;对于未执行Reducer的slots可以再其他Reducer发生故障时立即分配给新创建的Reducer,不会明显地加长任务的总执行时间。在设置Reducers数目时需要Reducers<Mappers

 

 

减少中间结果键值对数量

Map计算过程中产生的中间结果键值对将需要通过网络传送给Reduce节点,因此如果程序产生大量的中间结果键值对,将导致网络数据通信量的大幅增加,即增加了网络通信开销了,又降低了程序执行速度。为了提供一个基本的减少键值对刷领的优化手段,mr设计并提供了Combiner类在每个Map节点上合并所产生的中间结果键值对

 

排序

Map计算过程结束后进行分区Partition处理时,系统自动按照Key的输出键进行排序,因此,进入Reduce节点的所有键值对将保证是按照key值排序的,而键值对后面的{values}列表则不保证是排序的,解决这个问题的办法是:将value中需要排序的部分加入到key中形成复合键,另外处理字符串拼接形成的复合主键的效率可能不是很高,可以采用一个专门自定义的数据类型来实现

 

用户定制数据类型

Hadoop内置的数据类型

BooleanWritable

ByteWritable

DoubleWritable

FLoatWritable

IntWritable

LongWritable

Text:使用utf8格式存储的文本

NullWritable

 

用户自定义数据类型的实现

实现Writable接口:该数据能被序列化后完成网络传输或文件输入输出

实现WritableComparable接口:如果该数据作为主键使用或需要比较数值大小时

 

用户定制输入输出格式

Hadoop内置的数据输入格式和RecordReader

数据输入格式InputFormat用于描述mr作业的数据输入规范

最常用的数据输入格式包括TextInputFormat和KeyValueTextInputFormat

TextInputFormat:系统默认的数据输入格式,可以将文本文件分块并逐行读入以便map节点处理,读入一行是,所产生的主键key就是当前行在整个文本文件中的字节偏移位置,而value就是该行的内容

KeyValueTextInputFormat:将一个按照<key,value>存放的文本文件逐行读出,并自动解析成相应的key和value

对于一个数据输入格式,都需要有一个对应的RecordReader,它主要用于将一个文件中的数据记录拆分成具体的键值对,传送给Map过程作为键值对输入参数。每个数据输入格式都有一个默认的RecordReader。TextInputFormat的默认RecordReader为LineRecordReader,KeyValueTextInputFormat的默认KeyValueTextInputFormat为KeyValueLineRecordReader

AutoInputFormat

CombineFIleInputFormat

CompositeInputFormat

DBInputFormat:提供从关系数据库读取数据的格式,对应DBRecordReader提供读取数据记录的接口

FileInputFormat

LIneDocInputFormat

MultiFileInputFormat

......

 

用户定制数据输入格式和RecordReader

 

Hadoop内置的数据输出格式和RecordWriter

数据输出格式InputFormat用于描述mr作业的数据输出规范

TextOutputFormat:系统默认的数据输出格式,将计算结果以key + \t + value的形式逐行输出到文本文件中,对应的RecordWriter为LineRecordWriter

DBOutputFormat

FileOutputFormat

FilterOutputFormat

MultipleTextOutputFormat:generateFileNameForKeyValue()   文件夹/

 

用户定制数据输出格式和RecordWriter

 

用户定制Partitioner和Combiner

Partitioner:自定义分区,决定Map节点的输出将被分区到哪个Reduce节点。mr默认提供的Partitioner是HashPartitioner,它根据每条数据记录的主键进行hash操作,获得一个非负整数的hash码,然后与当前作业的Reduce节点数(分区数)做取模运算

Combiner:减少Map过程中间结果键值对的数量,降低网络数据通信开销

 

组合式MapReduce作业

一些复杂任务难以用一趟mr处理完成,需要将其拆分成多躺简单的mr子任务处理

 

迭代MapReduce计算任务

顺序组合式mr作业的执行

具有复杂依赖关系的组合式mr作业的执行:JobControl jc = new JobControl(name);

mr前处理和后处理步骤的链式执行:ChainMapper ChainReducer

 

全局数据文件的传递:当一个数据源的数据量较小,能够存放在单个节点的内存中时,可以使用一个称为“复制连接”(Replicated Join)的全局文件复制方法,把较小的数据源文件复制到每个Map节点上。Hadoop提供了一个Distributed Cache机制将一个或多个文件分布复制到所有节点上

job.addCacheFIle(URI uri);//将一个文件存放到Distributed Cache文件中

mapper或reducer的context类中

Path[] cacheFiles = context.getLocalCacheFIles();

if(cacheFiles != null && cacheFiles .length > 0){

FileReader fr = new FIleReader(cacheFiles [0].toString());

}

 

协议

public interface LoginServiceProtocol {

final long versionID = 1l;

String login(String username, String passwd);

}

 

rpc服务端

 

public class LoginServiceImpl implements LoginServiceProtocol{

@Override

public String login(String username, String passwd) {

return "success";

}

 

}

 

Server server = new RPC.Builder(new Configuration())

.setBindAddress("localhost")

.setPort(10000)

.setProtocol(LoginServiceProtocol.class)

.setInstance(new LoginServiceImpl())

.build();

server.start();

 

rpc客户端

long clientVersion = 1l;

InetSocketAddress addr = new InetSocketAddress("localhost", 10000);

Configuration conf = new Configuration();

LoginServiceProtocol proxy = RPC.getProxy(LoginServiceProtocol.class, clientVersion, addr, conf);

System.out.println(proxy.login("1", "123456"));

 

hdfs dfsadmin -help <command>

配额quota

目录配额 hdfs dfsadmin 1 /dir  空目录

空间配额 hdfs dfsadmin -setSpaceQuota 1k /dir 目录下存储文件总大小不超过1k

快照snapshot(采用差异化存储,使用同一blockid)

下载namenode最新的镜像文件

hdfs dfsadmin -fetchImage .

HDFS离线镜像查看器oiv

offline-image-viewer

hdfs oiv查看帮助

hdfs oiv -i ~/.dfs/name/current/fsimage_0000000000000000056 -o ./fsi,age.xml -p XML

保存hdfs的主要数据结构,主要涉及block信息

hdfs dfsadmin -metasave xx 在hadoop.log.dir目录下创建文件xx

保存namespace并滚动edit,需要在安全模式下操作

hdfs dfsadmin -safemode enter

hdfs dfsadmin -saveNamepsace

hdfs dfsadmin -rollEdits 不需要进入安全模式

格式

工具

算法

扩展名

是否可切割

 

 

DEFLATE

N/A

DEFLATE

.deflate

 

 

 

gzip

gzip

DEFLATE

.gz

 

 

 

bzip2

bzip2

bzip2

.bz2

1

 

 

LZO

lzop

LZO

.lzo

 

 

 

LZ4

N/A

LZ4

.lz4

 

 

 

Snappy

N/A

Snappy

.snappy

 

 

 

 

 

 

 

 

 

 

本地压缩测试:

Configuration conf = new Configuration();

conf.set(“fs.defaultFS”,”file:///”); //本地

String codecClassname = args[0];

Class<?> codecClass = Class.forName(codecClassname);

Class[] clazzes = {DefaultCodec.class, DeflateCodec.class, BZip2Codec.class, GzipCodec.class, Lz4Codec.class, SnappyCodec.class, LzoCodec.class};

CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);

String extensionName = codec.getDefaultExtension();//扩展名

CompressionOutputStream out = codec.createOutputStream(System.out);

IOUilts.copyBytes(SYstem.in,out,4096,false);

out.finish();

序列文件,可切割,可压缩,有同步点(可以自行设定)

格式:

No compression每一行的组成

Header + Record(...) + Sync + Record(...) + Sync + Record(...)

其中Record就是

Record length

Key length

Key

Value

Record compression每一行的组成

Header + Record(...) + Sync + Record(...) + Sync + Record(...)

其中Record就是

Record length

Key length

Key

Compressed value

Block compression每一行的组成(就是将两个sync之间的所有record压缩)

Header + Sync + Block + Sync + Block + Sync + Block

其中Sync就是

Number of records

Compressed key lengths

Compressed keys

Compressed value lengths

Compressed values

创建SequenceFile文件

Configuration conf = new Configuration();

conf.set(“fs.defaultFS”,”file:///”); //本地

FileSystem fs = FileSystem.get(conf);

FileContext fc = FileContext.getFileContext(conf);

Path p = new Path(“d:/myseq.seq”);

CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(GzipCodec.class, conf);

Metadata metadata = new SequenceFile.Metadata();

SequenceFile.Writer writer = SequenceFile.createWriter(fc, conf, P, IntWritable.class, Text.class, SequenceFile.CompressionType.NONE, codec, metadata, EnumSet.of(CreateFlag.CREATE), Options.CreateOpts.blockSize(1024 * 1024));

IntWritable key = new IntWritable();

Text value = new Text();

for(int i = 0; i < 100; i ++){

key.set(i);

value.set(“matio-” + i);

writer.append(key, value);

writer.sync();//添加同步点

}

writer.close();

读取SequenceFile文件

Configuration conf = new Configuration();

conf.set(“fs.defaultFS”,”file:///”); //本地

Path p = new Path(“d:/myseq.seq”);

FileSystem fs = FileSystem.get(conf);

SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);

long pos = reader.getPosition();

//reader.seek(123);

IntWritable key = new IntWritable();

Text value = new Text();

while(reader.next(key, value)) {

pos = reader.getPosition();

System.out.println(“pos=” + pos + “,” + key.get() + “:” + value.toString());

}

reader.close();

SequenceFile的增强,key写入时必须有序,检索更快。

创建MapFile文件

Configuration conf = new Configuration();

conf.set(“fs.defaultFS”,”file:///”); //本地

FileSystem fs = FileSystem.get(conf);

String dirName = “d:/hadoop.mapfile”;

MapFile.Writer writer = new MapFile.Writer(conf, fs, dirName, IntWritable.class, Text.class, SequenceFile.CompressionType.NONE);

for(int i = 0; i < 100; i ++){

writer.append(new IntWritable(i), new Text(“matio” + i));

}

writer.close();

hdfs dfs -text file:///d:/hadoop.mapfile

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

long maxSize = getMaxSplitSize(job);

 

//get the lower bound on split size imposed by the format.

protected long getFormatMinSplitSize() {

return 1;

}

 

public static final String SPLIT_MINSIZE = “mapreduce.input.fileinputformat.split.minsize”; //默认为0

 

public static final String SPLIT_MAXSIZE = “mapreduce.input.fileinputformat.split.maxsize”;

 

public static long getMinSplitSize(JobContext job) {

return job.getConfiguration().getLong(SPLIT_MINSIZE, 1l);

}

 

long blockSize = file.getBlockSize();

long splitSize = computeSplitSize(blockSize, minSize, maxSize);

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {

return Math.max(minSize, Math.min(maxSize, blockSize));

}

配置

mapred-site.xml

<property>

<-- 配置切片的最小值 -->

<name>mapreduce.input.fileinputformat.split.minsize</name>

<value>0</value>

</property>

<property>

<-- 配置切片的最大值 -->

<name>mapreduce.input.fileinputformat.split.maxsize</name>

<value>0</value>

</property>

Configuration conf = new Configuration();

conf.set(LocalJobRunner.LOCAL_MAX_MAPS, “7”);

conf.set(“mapreduce.input.fileinputformat.split.minsize”, “”);

conf.set(“mapreduce.input.fileinputformat.split.maxsize”, “”);

conf.set(FileInputFormat.SPLIT_MINSIZE, “”);

conf.set(FileInputFormat.SPLIT_MAXSIZE, “”);

FileInputFormat.setMinInputSplitSize(job, 13);

FileInputFormat.setMaxInputSplitSize(job, 13);

 

 

 

www.htsjk.Com true http://www.htsjk.com/Hadoop/37942.html NewsArticle Hadoop,   Apache Lucene开源的高性能全文检索工具包 Apache Nutch开源的Web搜索引擎 Google三大论文(GFS/MapReduce/BigTable) Apache Hadoop大规模数据批处理 Hadoop Common为其他Hadoop模块提供基础设施(...
相关文章
    暂无相关文章
评论暂时关闭