hbase源码分析,
1HBase的KeyValue分析
在HBase写入过程中,会检查Put中每个单元格Cell的KeyValue大小是否大于设置的maxKeyValueSize。要计算KeyValue的大小就需要了解KeyValue的的格式以及占用空间的计算方式。本文结合官方文档和源码对KeyValue的格式和空间计算进行分析和总结,便于后续继续分析HBase写入过程。
KeyValue的格式
KeyValue类是HBase中数据存储的核心,由keylength、valuelength、key、value四个部分组成,其中Key又由Row Length、Row、Column Family Length、Column Family、Column Qualifier、Time Stamp、Key Type七部分组成。
KeyValue不会在块之间拆分。例如,如果有一个8 MB的KeyValue,即使块大小是64kb,这个KeyValue将作为一个连贯块读取。
1、KeyLength存储Key的长度,占4B;
2、ValueLength存储Value的长度,4B;
3、Key存储具体的Cell数据:
1、 Row Length:存储rowkey的长度,占2B (Bytes.SIZEOF_INT);
2、 Row:存储Rowkey实际内容,其大小为Row Length ;
3、 Column Family Length:存储列簇Column Family的长度,占1B (Bytes.SIZEOF_BYTE);
4、 Column Family:存储Column Family实际内容,大小为Column Family Length;
5、 Column Qualifier:存储Column Qualifier对应的数据。
6、 Time Stamp:存储时间戳Time Stamp,占8B (Bytes.SIZEOF_LONG);
7、 Key Type:存储Key类型Key Type,占1B ( Bytes.SIZEOF_BYTE),Type分为Put、Delete、DeleteColumn、DeleteFamilyVersion、DeleteFamily、Maximum、Minimum等类型,标记这个KeyValue的类型;
由于Key中其它的字段占用大小已经知道,并且知道整个Key的大小,因此没有存储Column Qualifier的大小。
4、Value:存储单元格Cell对应的实际的值Value。
示例:对于Put : rowkey=row1, cf:attr1=value1操作,Key对应关系如下:
rowlength -----------→ 4
row -----------------→ row1
columnfamilylength --→ 2
columnfamily --------→ cf
columnqualifier -----→ attr1
timestamp -----------→ server time of Put
keytype -------------→ Put
rowlength占用2B空间,因此解释了rowkey的最大长度不能超过64kb。
KeyValue占用空间计算
validatePut方法中会使用KeyValueUtil.length(cell)来检查每个Cell的大小是否大于maxKeyValueSize。因此涉及到如何计算KeyValue整个占用的空间大小。
KeyValue类中提供了getKeyValueDataStructureSize方法用于计算KeyValue的大小。
public static long getKeyValueDataStructureSize(int rlength,
int flength, int qlength, int vlength) {
return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE
+ getKeyDataStructureSize(rlength, flength, qlength) + vlength;
}
主要包含三部分:
1、KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE:等于keylength和valuelength占用空间大小之和,为8B
2、KeyDataStructureSize:整个Key结构的大小
KeyDataStructureSize
= KeyValue.KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength
= 12+ cell.getRowLength()+cell.getFamilyLength()+cell.getQualifierLength()
3、vlength:等于valuelength 的值,使用cell.getValueLength()获取
因此整个KeyValue占用的空间大小:
KeyValueDataStructureSize=20B+cell.getRowLength()+cell.getFamilyLength()+cell.getQualifierLength()
KeyValue实现
KeyValue使用是byte数组来存储实际实际内容,其大小使用createByteArray方法来确定,经过一系列的计算确定。
// KeyValue core instance fields.
//KeyValue相关的不变byte[]数组,存储KeyValue实际内容
protected byte [] bytes = null; // an immutable byte array that contains the KV
// KeyValue在数组bytes的起始位置
protected int offset = 0; // offset into bytes buffer KV starts at
// KeyValue在数组bytes自起始位置offset后的长度
protected int length = 0; // length of the KV starting from offset.
KeyValue提供了一系列的Offset方法在数组中定位各个字段的的起始位置,如getValueOffset,getRowOffset等。也提供了一系列的length方法来获取KeyValue中各个字段的大小。
KeyValue类是HBase中数据存储的核心,通过分析KeyValue的结构和空间的计算方法,有利于指导HBase表结构的设计。HBase的rowkey以及columnfamily,columnqualifier在设计的时候越短越好,能够减少存储空间。之前在设计股市数据的表时,发现数据存入后占用空间是原始文件的6,7倍。字段多,但总记录数少,其占用空间居然超过了字段少,总记录条数多的文件,原因就是由于Column Qualifier字段太多,占用空间大于数据本身。
作者:ping_hu
来源:CSDN
原文:https://blog.csdn.net/ping_hu/article/details/77115998
版权声明:本文为博主原创文章,转载请附上博文链接!
2saveAsHadoopFile、saveAsNewAPIHadoopFile
saveAsHadoopFile、saveAsNewAPIHadoopFile两个算子来源于PairRDDFunctions。
saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。 可以指定outputKeyClass、outputValueClass以及压缩格式。
每个分区输出一个文件。
源码
/** saveAsHadoopFile */
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
//1、 配置hadoopConf
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
conf.setOutputFormat(outputFormatClass)
//2、 配置压缩
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
hadoopConf.setMapOutputCompressorClass(c)
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
// Use configured output committer if already set
if (conf.getOutputCommitter == null) {
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}
// 如果推测执行开启,并且输出提交类名包含“Direct”,
// 我们应该警告用户,如果他们使用的是直接输出提交者,他们可能会丢失数据。
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use an output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}
//3、 设置输出路径
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
//4、 调用saveAsHadoopDataset执行
saveAsHadoopDataset(hadoopConf)
}
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = ...,
codec: Option[Class[_ <: CompressionCodec]] = None
): Unit
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
codec: Class[_ <: CompressionCodec]
): Unit
案例
老版本API:org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)),2)
rdd1.saveAsHadoopFile("hdfs://leen:8020/test01",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
rdd1.saveAsHadoopFile("hdfs://leen:8020/test02",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],classOf[org.apache.hadoop.io.compress.GzipCodec])
结果
[leen@leen conf]$ hadoop fs -du -h hdfs://leen:8020/test01
0 0 hdfs://leen:8020/test01/_SUCCESS
8 8 hdfs://leen:8020/test01/part-00000
12 12 hdfs://leen:8020/test01/part-00001
[leen@leen conf]$ hadoop fs -du -h hdfs://leen:8020/test02
0 0 hdfs://leen:8020/test02/_SUCCESS
28 28 hdfs://leen:8020/test02/part-00000.gz
30 30 hdfs://leen:8020/test02/part-00001.gz
saveAsNewAPIHadoopFile
saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。 用法基本同saveAsHadoopFile。
源码
/**
* a new Hadoop API `OutputFormat`(mapreduce.OutputFormat)
*/
def saveAsNewAPIHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = NewAPIHadoopJob.getInstance(hadoopConf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
val jobConfiguration = job.getConfiguration
jobConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(jobConfiguration)
}
def saveAsNewAPIHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]
): Unit
def saveAsNewAPIHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration
): Unit
案例
新版本API:org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)),2)
rdd1.saveAsNewAPIHadoopFile("hdfs://leen:8020/test03",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
如果需要压缩,则在hadoopConf中配置
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("mapred.output.compress", "true")
hadoopConf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)),2)
rdd1.saveAsNewAPIHadoopFile("hdfs://leen:8020/test04",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],hadoopConf)
---------------------
作者:生命不息丶折腾不止
来源:CSDN
原文:https://blog.csdn.net/leen0304/article/details/78854530
版权声明:本文为博主原创文章,转载请附上博文链接!
4LoadIncrementalHFiles