欢迎投稿

今日深度:

elasticsearch+hadoop项目,

elasticsearch+hadoop项目,


系统核心架构设计

 

1、数据首先存入HDFS,可以通过Spark SQL直接导入到ES中,

HDFS中的数据量与ES中数据量大致相当。

2、Spark SQL可以直接通过建立Dataframe或者临时表连接ES,达到搜寻优化、减少数据量和筛选的目的,此时数据只在ES内存中而不在Spark SQL中。

3、筛选后的数据重新导入到Spark SQL中进行查询。

(整体流程代码见附录)

 

一、数据流程

1、数据在HDFS

   数据存储在HDFS上每个DataNodeblock块上。

 

 

2、数据加载到Spark SQL

1)数据从HDFS加载到Spark SQL中,以RDD形式存储。

 

2)添加数据结构信息转换为新的RDD

 

 

3)根据新的RDD创建DataFrame也即Dataset<Row>

 


 

 

 

左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schemaRDD是分布式的Java对象的集合。

 

DataFrame是分布式的Row对象的集合。

DataFrame引入了schemaoff-heap

schema : RDD每一行的数据, 结构都是一样的,这个结构就存储在schema中。 Spark通过schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。

 

off-heap: 意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)。Spark能够以二进制的形式序列化数据(不包括结构)off-heap中, 当要操作数据时,就直接操作off-heap内存。由于Spark理解schema,所以知道该如何操作。

 

DataSet结合了RDDDataFrame的优点,并带来的一个新的概念Encoder

当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。

 

4)由Dataset<Row>创建索引,并写入ES

 

3、数据在ES中建立索引

   

 

分片与副本:

一个ES索引由多个分片与副本组成,数据均匀分配到每个分片中,分片数多,则每个分片的数据量小;分片数少,则每个分片的数据量大。(具体分片和副本策略见ES优化

 

Document:

   ES中数据以Document为基本单位存储,Dataset<Row>中每条记录对应一个DocumentDocument随机存在于分片中。

(1)index: Document归属于哪个索引。

(2)type: Document归属于哪个类型。

(3)id:  Document的标识符(自动生成)。

(4)Version:版本号,系统默认。

4Spark SQL通过索引对ES中数据进行查询

 

Spark SQL通过pushdown机制将查询下推到ES数据源,由ES完成查询优化,再将结果返回SparkSQL中。

 

Push-Down operations

An important hidden feature of using elasticsearch-hadoop as a Spark source is that the connector understand the operations performed within the DataFrame/SQL and, by default, will translate them into the appropriate Elasticsearch Query DSL. In other words, the connector pushes down the operations directly at the source, where the data is efficiently filtered out so that only the required data is streamed back to Spark.

 

二、Spark SQLES优化

 

1Spark SQL优化

Spark SQL的性能调优选项主要有以下:

(1)spark.sql.codegen 默认值为false,当它设置为true时,Spark SQL会把每条查询的语句在运行时编译为java的二进制代码。这有什么作用呢?它可以提高大型查询的性能,但是如果进行小规模的查询的时候反而会变慢,就是说直接用查询反而比将它编译成为java的二进制代码快。所以在优化这个选项的时候要视情况而定。

 

(2)spark.sql.inMemoryColumnStorage.compressed 默认值为false 它的作用是自动对内存中的列式存储进行压缩。

 

3spark.sql.inMemoryColumnStorage.batchSize 默认值为1000 这个参数代表的是列式缓存时的每个批处理的大小。SaprkSQL会按照这个选项制定的大小把记录(每条记录即为一个Row对象)分组,然后分批压缩。如果将这个值调大可能会导致内存不够的异常,所以在设置这个的参数的时候得注意你的内存大小,一般情况下默认的1000就可以了。

 

 

 

 

 

2、ES优化

1.1分片策略

   主分片,副本和节点最大数之间数量存在以下关系:

节点数<=主分片数*(副本数+1

分片数等于节点数时,搜索效率最大。

 

1.2避免索引稀疏

1)避免把无关联的数据放在同一个index

      不要把完全不同的数据结构 document 放在同一个 index 里面。最好是将这些 document 放到不同的index里面,可以考虑创建一些较小的index, 用较少的shard去存储。

2)对于不同的字段禁用 norms doc_values

       通常只用于过滤而不需要进行打分(匹配度打分)的字段,可以直接禁用 norms 。不用于排序或者聚合的字段可以禁用 doc_values

1.3调优索引速度

1)增加 refresh_interval 刷新的间隔时间

        index.refresh_interval的默认值是 1s,这迫使Elasticsearch集群每秒创建一个新的 segment (可以理解为Lucene 的索引文件)。增加这个值,例如30s,可以允许更大的segment写入,减后以后的segment合并压力。

 

2)在初始化索引时,可以禁用 refresh replicas 数量

        如果需要一次加载较大的数据量进 index 里面时,可以先禁用 refresh ,把 index.refresh_interval 设置成为 -1 ,把 index.number_of_replicas 设置成 0。暂时把多个shard副本关闭(即如果当前index发生损坏便用丢失数据),但是这样做可以大大加快索引速度。当初始化索引完成,可以将 index.refresh_interval  index.number_of_replicas 设置回原来的值。

 

3)禁用 swapping

       把操作系统的虚拟内存交换区关闭。sysctl 里面添加 vm.swappiness = 1 .

 

 

 

1、测试的部分代码

   1)根据实例数据创建JavaBean

 

 

2SparkSQLHDFS上读取数据,并加载数据,在ES中创建索引。

 

3SaprkSQL读取索引,进行SQL查询。


www.htsjk.Com true http://www.htsjk.com/Elasticsearch/35976.html NewsArticle elasticsearch+hadoop项目, 系统核心架构设计   1、 数据首先存入 HDFS, 可以通过 Spark SQL 直接导入到 ES 中, HDFS 中的数据量与 ES 中数据量大致相当。 2、Spark SQL 可以直接通过建立 Dataframe...
相关文章
    暂无相关文章
评论暂时关闭