欢迎投稿

今日深度:

Hive优化,

Hive优化,


优化手段

合理控制Map和Reduce数

合并小文件

避免数据倾斜,解决数据倾斜

减少job数(合并Job、大Job分拆……)

 

一、  Map数和Reduce数

Hive官方:

https://cwiki.apache.org/confluence/display/Hive/Home

 

1.1、Map数

Map数过大

    Map阶段输出文件太小,产生大量小文件

     初始化和创建Map的开销很大

 

Map数太小

         文件处理或查询并发度小,Job执行时间过长

         大量作业时,容易堵塞集群

 

通常情况下,作业会通过input文件产生一个或者多个map数

主要的决定因素有: input的文件数,input的文件大小。

 

•举例

   a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块,Block128M),从而产生7个map数

b) 假设input目录下有3个文件a,b,c,大小分别为10m20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数

 

 

两种方式控制Map数:即减少map数和增加map数

减少map数可以通过合并小文件来实现,这点是对文件源

增加map数的可以通过控制上一个jobreduer数来控制(一个sqljoin多个表会分解为多个mapreduce)

 

 

小文件进行合并:

Map阶段Hive(0.7版本之后)自动对小文件合并

 

对应参数和默认值:

set hive.merge.mapfiles = true #在Map-only的任务结束时合并小文件

set hive.merge.mapredfiles = true #默认是false,true时在Map-Reduce的任务结束时合并小文件

set hive.merge.size.per.task= 256*1000*1000 #合并文件的大小

set mapred.max.split.size=256000000;  #每个Map最大分割大小

 

setmapred.min.split.size.per.node=100000000; #一个节点上split的最少值

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;  #执行Map前进行小文件合并

 

set的作用域是Session

 

 

在开启了org.apache.hadoop.hive.ql.io.CombineHiveInputFormat后,一个data node节点上多个小文件会进行合并,合并文件数由mapred.max.split.size限制的大小决定。

mapred.min.split.size.per.node决定了多个datanode上的文件是否需要合并~

 

1.1.2 、参数:mapred.map.tasks

Hive中setmapred.map.tasks=100;  当文件小于

 

案例:

环境如下:

hive.merge.mapredfiles=true (默认是false,可以在hive-site.xml里配置)

hive.merge.mapfiles=true

hive.merge.size.per.task=256000000

mapred.map.tasks=2

 

因为合并小文件默认为true,而dfs.block.size与hive.merge.size.per.task的搭配使得合并后的绝大部分文件都在256MB左右。

 

Case 1:

 

现在我们假设有3个300MB大小的文件,

整个JOB会有6个map,其中3个map分别处理256MB的数据,还有3个map分别处理44MB的数据。

木桶效应就来了,整个JOB的map阶段的执行时间不是看最短的1个map的执行时间,而是看最长的1个map的执行时间。所以,虽然有3个map分别只处理44MB的数据,可以很快跑完,但它们还是要等待另外3个处理256MB的map。显然,处理256MB的3个map拖了整个JOB的后腿。

 

Case 2:

 

如果我们把mapred.map.tasks设置成6,再来看一下有什么变化:

goalsize = min(900MB/6,256MB) = 150MB

整个JOB同样会分配6个map来处理,每个map处理150MB的数据,非常均匀,谁都不会拖后腿,最合理地分配了资源,执行时间大约为CASE 1的59%(150/256)

 

1.2、reduce数

Reduce数过大

生成了很多个小文件(最终输出文件有reduce决定,一个reduce一个文件),那么如果这些小文件作为下一个Job输入,则也会出现小文件过多需要进行合并的问题。

启动和初始化reduce也会消耗时间和资源;有多少个reduce就会有多少个输出文件

 

Reduce数过低

执行耗时

可能出现数据倾斜

 

 

 

 

Reducer个数的决定

默认下,Hive分配reducer个数基于以下:

    参数1:hive.exec.reducers.bytes.per.reducer(默认为1G)

参数2 :hive.exec.reducers.max(默认为999)

•计算reducer数的公式

•N=min(参数2,总输入数据量/参数1)

即默认一个reduce处理1G数据量

 

 

•什么情况下只有一个reduce

    很多时候你会发现任务中不管数据量多大,不管你有没有设置调整reduce个数的参数,任务中一直都只有一个reduce任务;

1、  除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外

2、没有group by的汇总

3、用了Order by。

 

1.2.1 、设置Reduce数

参数:mapred.reduce.tasks

默认:1

 

set mapred.reduce.tasks=10.. ;

作用域Session级

 

当某个Job的结果别后边Job多次引用时,设大该参数,以便增大访问的Map数。

 

以第一讲、第二讲的代码为例。

 

Reduce数决定中间或落地文件数,文件大小和Block大小无关。

 

 

二、数据倾斜

1、什么是数据倾斜?Hadoop框架的特性决定最怕数据倾斜

JobTracker  TaskTracker…..

老师          学生

 

•由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点。

 

症状:map阶段快,reduce阶段非常慢;

     某些map很快,某些map很慢;

     某些reduce很快,某些reduce奇慢

 

如下情况:

A、数据在节点上分布不均匀

B、join时on关键词中个别值量很大(如null值)

C、count(distinct ),在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by 字段分组,按distinct字段排序。

 

其中A无法避免。B见后边的Join章节。C语法上有时无法避免

 

 

 

 

三、  Join 、MapJoin、Group by

Join

按照join的key进行分发,而在join左边的表的数据会首先读入内存,如果左边表的key相对分散(或少,分散的意思是相同key的数据量小),读入内存的数据会比较小,join任务执行会比较快;而如果左边的表key比较集中,而这张表的数据量很大,那么数据倾斜就会比较严重。

Map阶段同一Key数据分发给一个reduce

 

Join原则:

小表 Join 大表,原因是在 Join 操作的Reduce 阶段(不是Map阶段),位于 Join左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存溢出的几率。

多个表关联时,最好分拆成小段,避免大sql(无法控制中间Job

 

多表 Join on条件相同时合并为一个 Map-Reduce做 OUTER JOIN 的时候也是一样,查看执行计划explain

比如查询,3个表关联:

select pt.page_id,count(t.url) PV

 fromrpt_page_type pt

 join

 (select url_page_id,url from trackinfo whereds='2013-10-11') t

 onpt.page_id=t.url_page_id

 join

 (select page_id from rpt_page_kpi_new whereds='2013-10-11') r

 ont.url_page_id=r.page_id

 group by pt.page_id;

 

比较2个表关联:

select pt.page_id,count(t.url) PV

 fromrpt_page_type pt

 join

 (select url_page_id,url from trackinfo whereds='2013-10-11') t

 onpt.page_id=t.url_page_id

group by pt.page_id;

 

利用这个特性,可以把相同join on条件的放在一个job处理。

 

 

如果 Join 的条件不相同,如:

 

INSERT OVERWRITE TABLE page_pv

select pt.page_id,count(t.url) PV

 fromrpt_page_type pt

 join

 (select url_page_id,url,province_id fromtrackinfo where ds='2013-10-11') t

 onpt.page_id=t.url_page_id

 join

 (select page_id,province_id fromrpt_page_kpi_new where ds='2013-10-11') r

 ont.province_id=r.province_id

 group by pt.page_id;

 

大表Join大表

访户未登录时,日志中userid是空,在用user_id进行hash分桶的时候,会将日志中userid为空的数据分到一起,导致了过大空key造成倾斜。

 

解决办法:

         把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的 reduce上,由于null值关联不上,处理后并不影响最终结果

案例:

End_user  5000万,纬度表

Trackinfo  每日2亿,按日增量表

 

原写法:

select u.id,t.url,t.track_time

 fromend_user u

 join

 (select end_user_id,url,track_time fromtrackinfo where ds='2013-12-01') t

 onu.id=t.end_user_id limit 2;

 

调整为:

select u.id,t.url,t.track_time

 fromend_user u

 join

(select case whenend_user_id='null' or end_user_id is null

             then cast(concat('00000000_',floor(rand()*1000000)) as bigint)

             else end_user_id end end_user_id ,

       url,track_time

  from trackinfo where ds='2013-12-01') t

 onu.id=t.end_user_id limit 2;

 

MapJoin

Join 操作在 Map 阶段完成,如果需要的数据在Map 的过程中可以访问到则不再需要Reduce。

 

小表关联一个超大表时,容易发生数据倾斜,可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

如:

INSERT OVERWRITETABLE page_pv

select /*+ MAPJOIN(pt) */

pt.page_id,count(t.url) PV

 from rpt_page_type pt

 join

 (select url_page_id,url from trackinfo whereds='2013-10-11') t

 on pt.page_id=t.url_page_id;

 

 

 

如果是小表,能否自动选择Mapjoin?

set hive.auto.convert.join = true; 默认为false

该参数为true时,Hive自动对左边的表统计量,如果是小表就加入内存,即对小表使用Map join

大表小表的阀值:

hive> set hive.mapjoin.smalltable.filesize;

hive.mapjoin.smalltable.filesize=25000000

默认值是25mb

 

 

hive.mapjoin.cache.numrows

•说明: mapjoin 存在内存里的数据量

•默认值:25000

 

hive.mapjoin.cache.numrows

•说明: mapjoin 存在内存里的数据量

•默认值:25000

 

hive.mapjoin.followby.gby.localtask.max.memory.usage

•说明:map join做group by 操作时,可以使用多大的内存来存储数据,如果数据太大,则不会保存在内存里

•默认值:0.55

hive.mapjoin.localtask.max.memory.usage

•说明:本地任务可以使用内存的百分比

•默认值: 0.90

 

 

 

 

 

Group By

 

   Map 端部分聚合:

       并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果。

       基于 Hash

       参数包括:

           hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True

           hive.groupby.mapaggr.checkinterval = 100000 在 Map 端进行聚合操作的条目数目

 

默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了

 

    有数据倾斜的时候进行负载均衡

        hive.groupby.skewindata = false

       当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

 

Count(distinct) 容易倾斜,当该字段存在大量值为NULL或空的记录

解决思路 

count distinct时,将值为空的数据在where里过滤调,在最后结果中加1

 

如:

         count(distinct end_user_id) as user_num

         修正为

         cast(count(distinctend_user_id)+1 as bigint) as user_num

where  end_user_id is not nulland query <> ''

 

 

 

笛卡尔积

尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积

On中支持udf函数

 

 

 

三、减少job数

源表相同时,如下可以合并Job

 

union all

Multi-Insert(Multi-GroupBy一定会和Multi-Insert一起使用,同一源表,可按照不同where、不同group by )

 

 

union all

对同一表的union all 只查一次源表,hive本身对这种union all做过优化。

但子查询不能group by

 

select url,session_id from

(select url,session_id

 fromtrackinfo where ds='2013-11-01'

union all

 select url,session_id

 fromtrackinfo where ds='2013-11-02'

) t ;

 

 

 

Multi-Insert

上方的sql等同于:

 

From trackinfo

 insertoverwrite table tmp_test partition(step=1)  

  selecturl,session_id where ds='2013-11-01'

 insertoverwrite table tmp_test partition(step=2)  

  selecturl,session_id  where ds='2013-11-02' ;

 

 

Hive数据倾斜解决方法总结

      数据倾斜是进行大数据计算时最经常遇到的问题之一。当我们在执行HiveQL或者运行MapReduce作业时候,如果遇到一直卡在map100%,reduce99%一般就是遇到了数据倾斜的问题。数据倾斜其实是进行分布式计算的时候,某些节点的计算能力比较强或者需要计算的数据比较少,早早执行完了,某些节点计算的能力较差或者由于此节点需要计算的数据比较多,导致出现其他节点的reduce阶段任务执行完成,但是这种节点的数据处理任务还没有执行完成。

  在hive中产生数据倾斜的原因和解决方法:

  1)group by,我使用Hive对数据做一些类型统计的时候遇到过某种类型的数据量特别多,而其他类型数据的数据量特别少。当按照类型进行group by的时候,会将相同的group by字段的reduce任务需要的数据拉取到同一个节点进行聚合,而当其中每一组的数据量过大时,会出现其他组的计算已经完成而这里还没计算完成,其他节点的一直等待这个节点的任务执行完成,所以会看到一直map 100%  reduce 99%的情况。

  解决方法:set hive.map.aggr=true

       set hive.groupby.skewindata=true

  原理:hive.map.aggr=true 这个配置项代表是否在map端进行聚合

     hive.groupby.skwindata=true 当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

  2)map和reduce优化。

    1.当出现小文件过多,需要合并小文件。可以通过set hive.merge.mapfiles=true来解决。

       2.单个文件大小稍稍大于配置的block块的大写,此时需要适当增加map的个数。解决方法:set mapred.map.tasks个数

       3.文件大小适中,但map端计算量非常大,如select id,count(*),sum(case when...),sum(case when...)...需要增加map个数。解决方法:set mapred.map.tasks个数,set mapred.reduce.tasks个数

  3)当HiveQL中包含count(distinct)时

         如果数据量非常大,执行如select a,count(distinct b) from t group by a;类型的SQL时,会出现数据倾斜的问题。

         解决方法:使用sum...group by代替。如select a,sum(1) from (select a, b from t group by a,b) group by a;

  4)当遇到一个大表和一个小表进行join操作时。

    解决方法:使用mapjoin 将小表加载到内存中。

    如:select /*+ MAPJOIN(a) */ 

          a.c1, b.c1 ,b.c2

     from a join b 

     where a.c1 = b.c1; 

  5)遇到需要进行join的但是关联字段有数据为空,如表一的id需要和表二的id进行关联

     解决方法1:id为空的不参与关联

    比如:select * from log a 

      join users b 

      on a.id is not null and a.id = b.id 

       union all 

       select * from log a 

      where a.id is null; 

   解决方法2:给空值分配随机的key值

      如:select * from log a 

        left outer join users b 

        on 

        case when a.user_id is null 

        then concat(‘hive’,rand() ) 

        else a.user_id end = b.user_id; 

 

 

 

1数据倾斜的原因

1.1操作:

1.2原因:

1)、key分布不均匀

2)、业务数据本身的特性

3)、建表时考虑不周

4)、某些SQL语句本身就有数据倾斜

1.3表现:

任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。

单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。 最长时长远大于平均时长。

2数据倾斜的解决方案

2.1参数调节:

hive.map.aggr=true

Map 端部分聚合,相当于Combiner

hive.groupby.skewindata=true

有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

2.2 SQL语句调节:

如何Join:

关于驱动表的选取,选用join key分布最均匀的表作为驱动表

做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。

大小表Join:

使用map join让小的维度表(1000条以下的记录条数) 先进内存。在map端完成reduce.

大表Join大表

把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

count distinct大量相同特殊值

count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

group by维度过小:

采用sum() group by的方式来替换count(distinct)完成计算。

特殊情况特殊处理:

在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。

3典型的业务场景

3.1空值产生的数据倾斜

场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。

解决方法1: user_id为空的不参与关联

select * from log a
  join users b
  on a.user_id is not null
  and a.user_id = b.user_id
union all
select * from log a
  where a.user_id is null;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

解决方法2 :赋与空值分新的key值

select *
  from log a
  left outer join users b
  on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
  • 1
  • 2
  • 3
  • 4

结论:方法2比方法1效率更好,不但io少了,而且作业数也少了。解决方法1中 log读取两次,jobs是2。解决方法2 job数是1 。这个优化适合无效 id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。

3.2不同数据类型关联产生数据倾斜

场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。

解决方法:把数字类型转换成字符串类型

select * from users a
  left outer join logs b
  on a.usr_id = cast(b.user_id as string)
  • 1
  • 2
  • 3

3.3小表不小不大,怎么用 map join 解决倾斜问题

使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理。 以下例子:

select * from log a
  left outer join users b
  on a.user_id = b.user_id;
  • 1
  • 2
  • 3

users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

解决方法

select /*+mapjoin(x)*/* from log a
  left outer join (
    select  /*+mapjoin(c)*/d.*
      from ( select distinct user_id from log ) c
      join users d
      on c.user_id = d.user_id
    ) x
  on a.user_id = b.user_id;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。 
4总结

使map的输出数据更均匀的分布到reduce中去,是我们的最终目标。由于Hash算法的局限性,按key Hash会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。在此给出较为通用的步骤:

1、采样log表,哪些user_id比较倾斜,得到一个结果表tmp1。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样是并不可少的。

2、数据的分布符合社会学统计规则,贫富不均。倾斜的key不会太多,就像一个社会的富人不多,奇特的人不多一样。所以tmp1记录数会很少。把tmp1和users做map join生成tmp2,把tmp2读到distribute file cache。这是一个map过程。

3、map读入users和log,假如记录来自log,则检查user_id是否在tmp2里,如果是,输出到本地文件a,否则生成

 

 

 

 

······································································································································································

有时候用hive读取外表数据时,比如csv这种类型的,需要跳过行首或者行尾一些和数据无关的或者自动生成的多余信息,这里可以用属性设置来实现,快速mark下,建表的时候设置如下

Create external table testtable (name string, message string) row format delimited fields terminated by '\t' lines terminated by '\n' location '/user/file.csv' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2");

对,就是上面sql中tblproperties的2个属性

“skip.heaer.line.count” 跳过文件行首多少行

“skip.footer.line.count”跳过文件行尾多少行

注意,这个属性的功能是hive0.13以后的都可以支持

www.htsjk.Com true http://www.htsjk.com/hive/39415.html NewsArticle Hive优化, 优化手段 合理控制Map和Reduce数 合并小文件 避免数据倾斜,解决数据倾斜 减少job数(合并Job、大Job分拆……)   一、  Map数和Reduce数 Hive官方: https://cwiki.apache.org/confluence...
相关文章
    暂无相关文章
评论暂时关闭