欢迎投稿

今日深度:

hive调优,

hive调优,


hive大数据倾斜总结:点击链接

Hive的map和reduce数调整:原文链接

https://yq.aliyun.com/articles/59635

一、    控制hive任务中的map数: 
1.    通常情况下,作业会通过input的目录产生一个或者多个map任务。 
主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);
2.    举例: 
a)    假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数
b)    假设input目录下有3个文件a,b,c,大小分别为10m,20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数
即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块。
3.    是不是map数越多越好? 
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,
而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。
而且,同时可执行的map数是受限的。
4.    是不是保证每个map处理接近128m的文件块,就高枕无忧了? 
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,
如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
针对上面的问题3和4,我们需要采取两种方式来解决:即减少map数和增加map数;
如何合并小文件,减少map数? 
    假设一个SQL任务:
         Select count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’;
         该任务的inputdir  /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04
         共有194个文件,其中很多是远远小于128m的小文件,总大小9G,正常执行会用194个map任务。
         Map总共消耗的计算资源: SLOTS_MILLIS_MAPS= 623,020
         我通过以下方法来在map执行前合并小文件,减少map数:
         set mapred.max.split.size=100000000;
                    set mapred.min.split.size.per.node=100000000;
                    set mapred.min.split.size.per.rack=100000000;
                    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
                 再执行上面的语句,用了74个map任务,map消耗的计算资源:SLOTS_MILLIS_MAPS= 333,500
         对于这个简单SQL任务,执行时间上可能差不多,但节省了一半的计算资源。
         大概解释一下,100000000表示100M, set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;这个参数表示执行前进行小文件合并,
         前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),
         进行合并,最终生成了74个块。      
如何适当的增加map数? 
         当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
         假设有这样一个任务:
         Select data_desc,
                count(1),
                count(distinct id),
                sum(case when …),
                sum(case when ...),
                sum(…)
        from a group by data_desc
                   如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,
                   这样就可以用多个map任务去完成。
                   set mapred.reduce.tasks=10;
                   create table a_1 as 
                   select * from a 
                   distribute by rand(123); 
                   
                   这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。
                   每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。
   看上去,貌似这两种有些矛盾,一个是要合并小文件,一个是要把大文件拆成小文件,这点正是重点需要关注的地方,
   根据实际情况,控制map数量需要遵循两个原则:使大数据量利用合适的map数;使单个map任务处理合适的数据量;
二、    控制hive任务的reduce数: 
1.    Hive自己如何确定reduce数: 
reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:
hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G) 
hive.exec.reducers.max(每个任务最大的reduce数,默认为999)
计算reducer数的公式很简单N=min(参数2,总输入数据量/参数1)
即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
如:select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 
            /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04 总大小为9G多,因此这句有10个reduce
2.    调整reduce个数方法一: 
调整hive.exec.reducers.bytes.per.reducer参数的值;
set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 这次有20个reduce     
3.    调整reduce个数方法二; 
set mapred.reduce.tasks = 15;
select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt;这次有15个reduce
4.    reduce个数并不是越多越好; 
同map一样,启动和初始化reduce也会消耗时间和资源;
另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
5.    什么情况下只有一个reduce; 
很多时候你会发现任务中不管数据量多大,不管你有没有设置调整reduce个数的参数,任务中一直都只有一个reduce任务;
其实只有一个reduce任务的情况,除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外,还有以下原因:
a)    没有group by的汇总,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 写成 select count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04';
这点非常常见,希望大家尽量改写。
b)    用了Order by
c)    有笛卡尔积
通常这些情况下,除了找办法来变通和避免,我暂时没有什么好的办法,因为这些操作都是全局的,所以hadoop不得不用一个reduce去完成;
    同样的,在设置reduce个数的时候也需要考虑这两个原则:使大数据量利用合适的reduce数;使单个reduce任务处理合适的数据量;
6.增加maper
map多和少,reduce多和少的时间差异是很明显的,3个map和30个map的时间差异基本上就是10倍,这就是为什么我们竭尽全力的增加map数,而为了达到这个增加map数的目的,通常有两种途径,一种是尽量增加文件的大小,变成block_size 的N倍,还有一种办法是将大文件拆分为小文件,每个小文件进行处理的时候,都会启动一个map。前者的写法通常如下: create table cajeep_test2 as selecta.*,dummy_string(200) dummy_string from tdl_en_pp_node_stream_tmp2,通过增加的dummy_string的垃圾字段来增加数据文件的大小,从而实现将文件分成多个文件的作用;后者则需要set mapred.reduce.tasks = 30;set hive.merge.mapredfiles=false;,通过这两个设置,让系统每次启动reduce,都会启动30个,同时将reduce生成的文件不进行合并。因为每个reduce都会生成一个文件,光设置还不够,还需要让sql产生reduce的过程,而且是不能全局性的reduce(order by之类的),除非有group by之类能够保证产生多个reduce的代码之外,通常的代码后面都需要加上distribute by rand(123),让其自动产生一个reduce的过程,一旦产生reduce的过程,就会自动调用30个reduce,从而达到生成30个文件的目的

数据倾斜


1、join的key值发生倾斜,key值包含很多空值或是异常值

这种情况可以对异常值赋一个随机值来分散key

如:

select userid,name 

from user_info a

join(

    select case when userid is null then cast(rand(47)*100000 as int)

    else userid

    from user_read_log

) b on a.userid = b.userid 

通过rand函数将为null的值分散到不同的值上,在key值比较就能解决数据倾斜的问题

2、当key值都是有效值时,解决办法为设置以下几个参数
set hive.exec.reducers.bytes.per.reducer = 1000000000
也就是每个节点的reduce 默认是处理1G大小的数据,
【join 操作】也产生了数据倾斜,那么你可以在hive中设定
set hive.optimize.skewjoin = true; 
set hive.skewjoin.key = skew_key_threshold (default = 100000)
hive 在运行的时候没有办法判断哪个key 会产生多大的倾斜,所以使用这个参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的reduce, 一般可以设置成你
(处理的总记录数/reduce个数)的2-4倍都可以接受.倾斜是经常会存在的,一般select 的层数超过2层,翻译成执行计划多于3个以上的mapreduce job 都很容易产生倾斜,建议每次运行比较复杂的sql 之前都可以设一下这个参数. 如果你不知道设置多少,可以就按官方默认的1个reduce 只处理1G 的算法,那么skew_key_threshold = 1G/平均行长. 或者默认直接设成250000000 (差不多算平均行长4个字节)

3、reduce数太少
set mapred.reduce.tasks=800;
默认是先设置hive.exec.reducers.bytes.per.reducer这个参数,设置了后hive会自动计算reduce的个数,因此两个参数一般不同时使用
4、对于group by 和count(distinct)产生倾斜的问题
set hive.map.aggr=true (开启map端combiner); //在Map端做combiner,假如map各条数据基本上不一样, 聚合没什么意义,做combiner反而画蛇添足,hive里也考虑的比较周到通过参数hive.groupby.mapaggr.checkinterval = 100000 (默认)
hive.map.aggr.hash.min.reduction=0.5(默认)
两个参数的意思是:预先取100000条数据聚合,如果聚合后的条数/100000>0.5,则不再聚合
set hive.groupby.skewindata=true;//决定 group by 操作是否支持倾斜的数据。注意:只能对单个字段聚合.控制生成两个MR Job,第一个MR Job Map的输出结果随机分配到reduce做次预汇总,减少某些key值条数过多某些key条数过小造成的数据倾斜问题
5、小表与大表关联
此时,可以通过mapjoin来优化,
set hive.auto.convert.join = true ; //将小表刷入内存中  
set hive.mapjoin.smalltable.filesize = 2500000 ;//刷入内存表的大小(字节)  

6、group by  代替count(distinct )使用disticnt函数,所有的数据只会shuffle到一个reducer上,导致reducer数据倾斜严重。select count(*) 
from 
( select id 
  from a 
  group by ip 
) b 

7、join 经验/优化
数据倾斜,''值倾斜,加rand
select
    *
from
    log.logA a
left join
    dim.B b
        on 
            case
                when id <>  ''
                then id
                else concat('id_len0_', rand())
            end = cast(b.id as string)
        and b.id
        and a.dt = '20170925';


Hive 中关于 join 有很多优化。最有效的莫过于 mapjoin. 
1. 写 join on 的条件避免数据倾斜
2. 考虑设置参数 set hive.auto.convert.join.noconditionaltask.size=500000000; 默认是 10000000。取值大小视情况而定

8、count distinct(执行时间过长)
如果可以不写 distinct,尽量不要写
select distinct name from xxx
select name from xxx group by xxx.
统计超过两个字段的 count(distinct) 的值,无法再通过改写成 group by 来解决。
解决办法是:
    Hive:  设置一些优化参数
    Presto: 用近似函数 approx_distinct(误差 2% 以内)。一个真实的例子
[其他]set hive.exec.max.dynamic.partitions=10100;
set mapred.reduce.tasks=100;
set mapred.max.split.size=402653184;
set mapred.min.split.size.per.node=402653184;
set mapred.min.split.size.per.rack=402653184;
set hive.exec.dynamic.partition=true;
set hive.exec.parallel=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.parallel=true;
set hive.exec.reducers.bytes.per.reducer=512000000;
set hive.exec.reducers.max=3000;
set mapred.max.split.size=768000000;
set mapred.min.split.size.per.node=768000000;
set mapred.min.split.size.per.rack=768000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=100000000;
set hive.mergejob.maponly=true;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.smallfiles.avgsize=1024000000;
set mapred.map.child.java.opts="-Xmx4096m";
set mapreduce.map.memory.mb=6288;
set mapred.reduce.child.java.opts="-Xmx4096m";
set mapreduce.reduce.memory.mb=6288;


kylin cube 高级配置常用参数

kylin.source.hive.flat-table-cluster-by-column UUID
kylin.engine.mr.base-cuboid-config-override.mapred.map.child.java.opts -Xmx8g
kylin.engine.mr.base-cuboid-config-override.mapreduce.map.memory.mb 8500
kylin.engine.mr.config-override.mapred.reduce.child.java.opts -Xmx4g
kylin.engine.mr.config-override.mapreduce.reduce.memory.mb 4500
kylin.engine.mr.config-override.mapreduce.task.timeout 14400000

www.htsjk.Com true http://www.htsjk.com/hive/41174.html NewsArticle hive调优, hive大数据倾斜总结:点击链接 Hive的map和reduce数调整:原文链接 https://yq.aliyun.com/articles/59635 一、    控制hive任务中的map数:  1.    通常情况下,作业会通过input的目录产生一...
相关文章
    暂无相关文章
评论暂时关闭