欢迎投稿

今日深度:

hbase源码分析,

hbase源码分析,


1HBase的KeyValue分析

在HBase写入过程中,会检查Put中每个单元格Cell的KeyValue大小是否大于设置的maxKeyValueSize。要计算KeyValue的大小就需要了解KeyValue的的格式以及占用空间的计算方式。本文结合官方文档和源码对KeyValue的格式和空间计算进行分析和总结,便于后续继续分析HBase写入过程。
KeyValue的格式
KeyValue类是HBase中数据存储的核心,由keylength、valuelength、key、value四个部分组成,其中Key又由Row Length、Row、Column Family Length、Column Family、Column Qualifier、Time Stamp、Key Type七部分组成。
KeyValue不会在块之间拆分。例如,如果有一个8 MB的KeyValue,即使块大小是64kb,这个KeyValue将作为一个连贯块读取。



1、KeyLength存储Key的长度,占4B;
2、ValueLength存储Value的长度,4B;
3、Key存储具体的Cell数据:

1、 Row Length:存储rowkey的长度,占2B (Bytes.SIZEOF_INT);
2、 Row:存储Rowkey实际内容,其大小为Row Length ;
3、 Column Family Length:存储列簇Column Family的长度,占1B (Bytes.SIZEOF_BYTE); 
4、 Column Family:存储Column Family实际内容,大小为Column Family Length;
5、 Column Qualifier:存储Column Qualifier对应的数据。
6、 Time Stamp:存储时间戳Time Stamp,占8B (Bytes.SIZEOF_LONG);
7、 Key Type:存储Key类型Key Type,占1B ( Bytes.SIZEOF_BYTE),Type分为Put、Delete、DeleteColumn、DeleteFamilyVersion、DeleteFamily、Maximum、Minimum等类型,标记这个KeyValue的类型;
由于Key中其它的字段占用大小已经知道,并且知道整个Key的大小,因此没有存储Column Qualifier的大小。

4、Value:存储单元格Cell对应的实际的值Value。
示例:对于Put : rowkey=row1, cf:attr1=value1操作,Key对应关系如下:

rowlength -----------→ 4
row -----------------→ row1
columnfamilylength --→ 2
columnfamily --------→ cf
columnqualifier -----→ attr1
timestamp -----------→ server time of Put
keytype -------------→ Put

rowlength占用2B空间,因此解释了rowkey的最大长度不能超过64kb。
KeyValue占用空间计算
validatePut方法中会使用KeyValueUtil.length(cell)来检查每个Cell的大小是否大于maxKeyValueSize。因此涉及到如何计算KeyValue整个占用的空间大小。
KeyValue类中提供了getKeyValueDataStructureSize方法用于计算KeyValue的大小。

public static long getKeyValueDataStructureSize(int rlength,
      int flength, int qlength, int vlength) {
    return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE
        + getKeyDataStructureSize(rlength, flength, qlength) + vlength;
  }

主要包含三部分:
1、KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE:等于keylength和valuelength占用空间大小之和,为8B
2、KeyDataStructureSize:整个Key结构的大小
KeyDataStructureSize
= KeyValue.KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength
= 12+ cell.getRowLength()+cell.getFamilyLength()+cell.getQualifierLength()
3、vlength:等于valuelength 的值,使用cell.getValueLength()获取

因此整个KeyValue占用的空间大小:

KeyValueDataStructureSize=20B+cell.getRowLength()+cell.getFamilyLength()+cell.getQualifierLength()

KeyValue实现
KeyValue使用是byte数组来存储实际实际内容,其大小使用createByteArray方法来确定,经过一系列的计算确定。

// KeyValue core instance fields.
//KeyValue相关的不变byte[]数组,存储KeyValue实际内容  
  protected byte [] bytes = null;  // an immutable byte array that contains the KV
// KeyValue在数组bytes的起始位置  
  protected int offset = 0;  // offset into bytes buffer KV starts at
// KeyValue在数组bytes自起始位置offset后的长度  
  protected int length = 0;  // length of the KV starting from offset.

KeyValue提供了一系列的Offset方法在数组中定位各个字段的的起始位置,如getValueOffset,getRowOffset等。也提供了一系列的length方法来获取KeyValue中各个字段的大小。
KeyValue类是HBase中数据存储的核心,通过分析KeyValue的结构和空间的计算方法,有利于指导HBase表结构的设计。HBase的rowkey以及columnfamily,columnqualifier在设计的时候越短越好,能够减少存储空间。之前在设计股市数据的表时,发现数据存入后占用空间是原始文件的6,7倍。字段多,但总记录数少,其占用空间居然超过了字段少,总记录条数多的文件,原因就是由于Column Qualifier字段太多,占用空间大于数据本身。

作者:ping_hu
来源:CSDN
原文:https://blog.csdn.net/ping_hu/article/details/77115998
版权声明:本文为博主原创文章,转载请附上博文链接!

2saveAsHadoopFile、saveAsNewAPIHadoopFile

saveAsHadoopFile、saveAsNewAPIHadoopFile两个算子来源于PairRDDFunctions。

saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。 可以指定outputKeyClass、outputValueClass以及压缩格式。
每个分区输出一个文件。
源码

/** saveAsHadoopFile */

def saveAsHadoopFile(
    path: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[_ <: OutputFormat[_, _]],
    conf: JobConf = new JobConf(self.context.hadoopConfiguration),
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
  //1、 配置hadoopConf
  val hadoopConf = conf
  hadoopConf.setOutputKeyClass(keyClass)
  hadoopConf.setOutputValueClass(valueClass)
  conf.setOutputFormat(outputFormatClass)
  //2、 配置压缩
  for (c <- codec) {
    hadoopConf.setCompressMapOutput(true)
    hadoopConf.set("mapred.output.compress", "true")
    hadoopConf.setMapOutputCompressorClass(c)
    hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
    hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
  }

  // Use configured output committer if already set
  if (conf.getOutputCommitter == null) {
    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
  }

  // 如果推测执行开启,并且输出提交类名包含“Direct”,
  // 我们应该警告用户,如果他们使用的是直接输出提交者,他们可能会丢失数据。
  val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
  val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
  if (speculationEnabled && outputCommitterClass.contains("Direct")) {
    val warningMessage =
      s"$outputCommitterClass may be an output committer that writes data directly to " +
        "the final location. Because speculation is enabled, this output committer may " +
        "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
        "committer that does not have this behavior (e.g. FileOutputCommitter)."
    logWarning(warningMessage)
  }

  //3、 设置输出路径
  FileOutputFormat.setOutputPath(hadoopConf,
    SparkHadoopWriter.createPathFromString(path, hadoopConf))
  //4、 调用saveAsHadoopDataset执行
  saveAsHadoopDataset(hadoopConf)
}
def saveAsHadoopFile(
path: String, 
keyClass: Class[_], 
valueClass: Class[_], 
outputFormatClass: Class[_ <: OutputFormat[_, _]], 
conf: JobConf = ..., 
codec: Option[Class[_ <: CompressionCodec]] = None
): Unit

def saveAsHadoopFile(
path: String, 
keyClass: Class[_], 
valueClass: Class[_], 
outputFormatClass: Class[_ <: OutputFormat[_, _]], 
codec: Class[_ <: CompressionCodec]
): Unit

案例
老版本API:org.apache.hadoop.mapred.TextOutputFormat

import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)),2)
rdd1.saveAsHadoopFile("hdfs://leen:8020/test01",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

rdd1.saveAsHadoopFile("hdfs://leen:8020/test02",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],classOf[org.apache.hadoop.io.compress.GzipCodec])

结果

[leen@leen conf]$ hadoop fs -du -h hdfs://leen:8020/test01 
0 0 hdfs://leen:8020/test01/_SUCCESS 
8 8 hdfs://leen:8020/test01/part-00000 
12 12 hdfs://leen:8020/test01/part-00001

[leen@leen conf]$ hadoop fs -du -h hdfs://leen:8020/test02 
0 0 hdfs://leen:8020/test02/_SUCCESS 
28 28 hdfs://leen:8020/test02/part-00000.gz 
30 30 hdfs://leen:8020/test02/part-00001.gz

saveAsNewAPIHadoopFile
saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。 用法基本同saveAsHadoopFile。
源码

/**
   * a new Hadoop API `OutputFormat`(mapreduce.OutputFormat) 
   */
  def saveAsNewAPIHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
      conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope {
    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
    val hadoopConf = conf
    val job = NewAPIHadoopJob.getInstance(hadoopConf)
    job.setOutputKeyClass(keyClass)
    job.setOutputValueClass(valueClass)
    job.setOutputFormatClass(outputFormatClass)
    val jobConfiguration = job.getConfiguration
    jobConfiguration.set("mapred.output.dir", path)
    saveAsNewAPIHadoopDataset(jobConfiguration)
  }
def saveAsNewAPIHadoopFile(
path: String, 
keyClass: Class[_], 
valueClass: Class[_], 
outputFormatClass: Class[F]
): Unit

def saveAsNewAPIHadoopFile(
path: String, 
keyClass: Class[_], 
valueClass: Class[_], 
outputFormatClass: Class[F], 
conf: Configuration
): Unit

案例

新版本API:org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)),2)
rdd1.saveAsNewAPIHadoopFile("hdfs://leen:8020/test03",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

如果需要压缩,则在hadoopConf中配置

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

val hadoopConf = sc.hadoopConfiguration
    hadoopConf.set("mapred.output.compress", "true")
    hadoopConf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
    hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)

var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)),2)
rdd1.saveAsNewAPIHadoopFile("hdfs://leen:8020/test04",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],hadoopConf)
--------------------- 
作者:生命不息丶折腾不止 
来源:CSDN 
原文:https://blog.csdn.net/leen0304/article/details/78854530 
版权声明:本文为博主原创文章,转载请附上博文链接!

4LoadIncrementalHFiles

www.htsjk.Com true http://www.htsjk.com/hbase/42184.html NewsArticle hbase源码分析, 1HBase的KeyValue分析 在HBase写入过程中,会检查Put中每个单元格Cell的KeyValue大小是否大于设置的maxKeyValueSize。要计算KeyValue的大小就需要了解KeyValue的的格式以及占用空间的...
相关文章
    暂无相关文章
评论暂时关闭