Hive性能优化,
1 介绍
Hadoop的计算框架特性会衍生出哪些问题?
- 数量量大不是问题,数据倾斜是个问题
- jobs数比较多的作业运行效率相对比较低,比如即使有几百生的表,如果多次关联多次汇总,产生十几个jobs,耗时很长。原因是MapReduce作业初始化时间是比较长的
- sum,count,max,min等UDAF,不怕数据倾斜问题,hadoop在map端的汇总合并优化,使数据倾斜不成问题
- count(distinct),在数据量大的情况下,效率较低,如果是多count(distinct)效率更低,因为count(distinct)是按group by字段分组,按distinct字段排序,一般这种分布方式是很倾斜的。举个栗子:比如男uv,女uv,像淘宝一天30亿的pv,如果按性别分组,分配2个reduce,每个reduce处理15亿数据
面对这些问题,可通过下面的手段进行优化:
- 好的模型设计事半功倍
- 解决数据倾斜问题
- 减少job数
- 设置使用的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用150个reduce,那是相当的浪费,1个足够)
- 了解数据分布,手动解决数据倾斜问题。set hive.groupby.skewindata = true 这是通用的算法优化,但算法优化有时不能适应特定业务背景,开发人员了解业务,了解数据,可以通过业务逻辑精确有效地解决数据倾斜问题。
- 数据量较大的情况下,慎用count(distinct),count(distinct)容易产生倾斜问题
- 对小文件进行合并,是行之有效的提高效率的方法,假如所有的作业设置合理的文件数,对job的整体调度效率也会产生积极的正向影响。
- 优化时把握整体,单个作业最优不如整体最优
2 性能低下的根源
hive性能优化时,把HiveQL当做M/R程序来读,即从M/R的运行角度来考虑优化性能,从更底层思考如何优化运算性能,而不仅仅局限于逻辑代码的替换层面。
Hadoop的核心能力是partition和sort,因而这也是优化的根本。
Hadoop处理数据的过程,有几个显著的特征:
- 数据的大规模并不是负载重点,造成运行压力过大是因为运行数据的倾斜
- jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联对此汇总,产生几十个job,将会需要30分钟以上的时间且大部分时间将被用于作业分配,初始化和数据输出。M/R作业初始化的时间是比较耗时间资源的一个部分
- 在使用SUM,Count,max,min等UDAF函数时,不怕数据倾斜问题,Hadoop在Map端汇总合并优化过,使用数据倾斜不成问题
- count(distinct),在数据量大的情况下,效率较低,如果是多count(distinct)效率更低,因为count(distinct)是按group by字段分组,按distinct字段排序,一般这种分布方式是很倾斜的。举个栗子:比如男uv,女uv,像淘宝一天30亿的pv,如果按性别分组,分配2个reduce,每个reduce处理15亿数据
- 数据倾斜是导致次第大幅降低的主要原因,可以采用多一镒Map/Reduce的方法,避免倾斜
最后得出的结论:避实就虚,用job数的增加,输入量的增加,占用更多存储空间,充分利用空闲CPU等各种方法,分解数据倾斜造成的负担
3 配置角度优化
知道了性能低下的根源,同样也可以从Hive的配置解读去优化。Hive系统内部已针对不同的查询预设定了优化方法,用记可以通过调整配置进行控制,以下举例介绍部分优化的策略以及优化控制选项。
3.1 列裁剪
Hive在读数据的时候,可以只读取查询中需要用到的列,而忽略其它列。例如,若有以下查询:
SELECT a,b FROM q WHERE e<10;
在实话此项查询中,Q表有5列(a, b, c, d, e),Hive只读取查询逻辑中真实需要的3列a、b、e,而忽略c,d;这样做节省了读取开销,中间表存储开销和数据整合开销。
裁剪所对应的参数项为:hive.optimize.cp = true(默认值为值)
3.2 分区裁剪
可以在查询的过程中减少不必要的分区。例如,若有以下查询:
SELECT * FROM (SELECTT a1,COUNT(1) FROM T GROUP BY a1) subq WHERE subq.prtn=100;
SELECT * FROM T1 JOIN (SELECT * FROM T2) subq ON (T1.a1=subq.a2) WHERE subq.prtn=100;
查询语句若将“subq.prtn = 100”条件放入子查询中更为高效,可以减少读入的分区数目,Hive自动执行这种裁剪优化
分区参数为:hive.optimize.pruner = true(默认值为true)
3.3 Join原则
在使用写有Join操作的查询语句时有一条原则:应该将条止少的表/子查询放在Join操作符的左边。原因在Join操作的Reduce阶段,位于Join操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生OOM错误的几率。对于一条语句有多个Join情况,如果Join的条件相同,比如:
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);
- 如果Join的Key相同,不管有多少个表,都会合并为一个MapReduce
- 一个MapReduce任务,而不是 n 个
- 在做 outer join 的时候也是一样
如果 Join 的条件不相同,比如:
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x on (u.age = x.age);
MapReduce的任务数目和Join操作的数目是对应的,上述查询和以下查询是等价的:
SELECT * FROM page_view p JOIN user u ON (pv.userid = u.userid);
SELECT x.pageid, x.age FROM tmptable x JOIN newuser y ON (x.age = y.age);
3.4 map jion操作
Join操作在Map阶段完成,不再需要Reduce,前提条件是需要的数据在Map的过程中可以访问到。比如查询:
SELECT /*+ MAPJOIN(pv) */ pv.pageid, u.age
FROM page_view pv
JOIN user u ON (pv.userid = u.userid);
可以在Map阶段完成Join,相关参数为:
- hive.join.emit.interval = 1000
- hive.mapjoin.size.key = 10000
- hive.mapjoin.cache.numrows = 10000
3.5 group by操作
进行group by操作时需要注意以下几点:
- Map端部分聚合
事实上并不是所有的聚合操作都需要在reduce部分进行,很多聚合操作都可以先在Map端进行部分聚合,然后reduce端得出最终结果。这里需要修改的参数为:
hive.map.aggr = true (用于设定是否在在Map端进行聚合,默认值为真)
hive.groupby.mapaggr.checkinterval = 100000(用于设定map端进行聚合操作的条目数)
- 有数据倾斜时进行负载均衡
此处需要设定 hive.groupby.skewindata,当选项设定为 true 时,生成的查询计划有2个MapReduce任务。在第一个MapReduce中,map的输出结果集合会随机分布到reduce中,每个reduce做部分聚合操作,并输出结果。这样处理的结果是,相同的group by key有有可能分发到不同的reduce中,从而达到负载均衡的目的;第二个MapReduce任务再根据预处理的数据结果按照group by key分布到reduce中(这个过程可以保证相同的group by key 分布到同一个 reduce中),最后完成最终的聚合操作。
3.6 合并小文件
文件数量多,容易在文件存储端造成瓶颈,给HDFS带来压力,影响处理效率。对此,可以通过合并Map 和Reduce 的结果文件来消除这样的影响。
用于设置合并属性的参数有:
-
是否合并Map输出文件:hive.merge.mapfiles=true(默认值为真)
-
是否合并Reduce 端输出文件:hive.merge.mapredfiles=false(默认值为假)
-
合并文件的大小:hive.merge.size.per.task=256*1000*1000(默认值为 256000000)
4 程序角度优化
4.1 熟练使用SQL提高查询
熟练地使用SQL,能写出高效率的查询语句
场景:有一张user表,为卖家每天收到的表,user_id,ds(日期)为key,属性有主营类目,指标有交易金额,交易笔数。每天要取前10天的总收入、总笔数和最近一天的主营类目。
解决方法 1
如下所示:常用方法
INSERT OVERWRITE TABLE t1
SELECT user_id,substr(MAX(CONCAT(ds,cat),9) AS main_cat) FROM users
WHERE ds=20190329 --20190329 为日期列的值,实际代码中可以用函数表示出当天日期
GROUP BY ser_id;
INSERT OVERWRITE TABLE t2
SELECT user_id,sum(qty) AS qty,SUM(amt) AS amt FROM users
WHERE ds BETWEEN 20120301 AND 20120329
GROUP BY user_id
SELECT t1.user_id,t1.main_cat,t2.qty,t2.amt FROM t1
JOIN t2 ON t1.user_id=t2.user_id
下面给出方法1的思路,实现步骤如下:
- 1)利用分析函数,取每个user_id最近一天的主营类目,存入临时表 t1
- 2)汇总10天的总交易金额、交易笔数,存入临时表t2
- 3)关联t1,t2,得到最终的结果
解决方法 2
如下所示:优化方法
SELECT user_id,substr(MAX(CONCAT(ds,cat)),9) AS main_cat,SUM(qty),SUM(amt) FROM users
WHERE ds BETWEEN 20190301 AND 20190329
GROUP BY user_id
在工作中总结出:方案 2 的开销等于方案 1 的第二步的开销,性能提升,由原有的 25分钟完成,缩短为 10 分钟以内完成。节省了两个临时表的读定是一个关键原因,这种方式也适用于Oracle中的数据查找工作。
SQL 具有普遍适用性,很SQL通用的优化方案在Hadoop分布式计算方式中也可以达到效果
4.2 无效ID在关系时的数据倾斜问题
问题:日志中常会出现信息丢失,比如每日约为20亿的全网日志,其中的 user_id 为主键,在日志收集过程中会丢失,出现主键为 null 的情况,如果取其中的 user_id 和 bmw_users 关联,就会碰到数据倾斜的问题。原因是Hive中,主键为null值的项会被当估相同的key而分配进同一个计算Map。
解决方法 1:user_id为空的不参与关联,子查询过滤null
SELECT * FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id=b.user_id
UNION All SELECT * FROM log a WHERE a.user_id IS NULL
解决方法 2:函数过滤null
SELECT * FROM log a
LEFT OUTER JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT(‘dp_hive’,RAND()) ELSE a.user_id END =b.user_id;
在工作中总结出:解决方法 2 比解决方法 1 效果更好,不但 IO 少了,而作业数也少了。解决方法 1 中log读取2次, job数为2。解决方法2中job数是1。这个优化适合无效 id(比如-99、‘’、null等)产生的倾斜问题。把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,从而解决数据倾斜问题。因为空值不参与关联,即使分到不同的reduce上,也不会影响最终的结果。
Hadoop通用关联的实现方法是:关联通过二次排序实现的,关联的列为 partition key,关联的列和表的 tag 组成排序的 group key,根据 partition key 分配 reduce,同一 reduce 内根据 group key 排序。
4.3 不同数据类型关联产生的倾斜问题
问题:不同数据类型 id 的关联会产生数据倾斜问题
一张表 s8 的日志,每个端口一条记录,要和端口表关联,但关联却碰到倾斜的问题。s8 的日志中有 32 位字符串商品 id,也有数值商品 id,日志中类型是 string 的,但商品中的数值 id 是 bigint 的。猜想问题的原因是把 s8 的商品 id 转成数值 id 做 hash来分配 reduce,所以字符串 id 的s8 日志,都分配到一个reduce上了,解决的方法验证了这个猜测。
解决方法:把数据类型转换成字符串类型
SELECT * FROM s8_log a
LEFT OUTERJOIN r_auction_auctions b
ON a.auction_id= CAST(b.auction_id AS STRING)
4.4 利用 Hive 对 union all 优化的特性
多表 union all 会优化成一个 job
问题:比如推广效果表要和商品表关联,效果表中的 auction_id列既有 32 位字符串商品 id,也有数字 id,和商品表关联得到商品的信息
SELECT * FROM effect a
JOIN
(SELECT auction_id AS auction_id FROM auctions
UNION All
SELECT auction_string_id AS auction_id FROM auctions) b
ON a.auction_id=b.auction_id
比分别过滤数字id,字符串id然后分别和商品表关联性能要好。
这样写的好处:1 个MapReduce 作业,商品表只读一次,推广效果表只读取一次。把这个 SQL 换成 Map/Reduce 代码的话,Map 的时候,把 a 表的记录打上标签 a,商品表记录每读取一条,打上标签 b,变成两个 <key, value>对,分别为<(b,数字id), value>,<(b, 字符串id), value>
4.5 解决 Hive 对 Union all 优化的短板
Hive 对 Union all 的优化的特性:对 union all 优化只局限于非嵌套查询
- 消灭子查询的 group by
示例1:子查询内有 group by
SELECT * FROM
(SELECT * FROM t1 GROUP BY c1,c2,c3 UNION ALL SELECT * FROM t2 GROUP BY c1,c2,c3)t3
GROUP BY c1,c2,c3
从业务逻辑上说,子查询内的 group by 显得多余(功能上的多余,除非有 count(distinct)),如果不是因为 Hive Bug 或者性能上的考量(曾经出现如果不执行子查询 group by,数据得不到正确的结果的 Hive Bug)。所以这个 Hive 按经验转换成如下:
SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t2)t3 GROUP BY c1,c2,c3
调优结果:经过测试,并未出现 union all 的Hive Bug,数据是一致的。MapReduce的作业数由 3 减少到 1。
t1 相当于一个目录,t2 相当于一个目录,对 MapReduce 程序来说,t1、t2 可以作为 MapReduce 作业的 multi inputs,这可以通过一个 MapReduce 来解决问题。Hadoop 的计算框架,不怕数据多,就怕作业数多。
但如果换成是其它计算平台如Oracle,那就不一定了,因为把大的输入拆成两个输入,分别排序汇总后 merge(假如两个子排序是并行的话),是有可能性能更优的(比如希尔排序比冒泡排序的性能更优)
- 消灭子查询的 count(distinct),max,min
SELECT * FROM
(SELECT * FROM t1
UNION ALL SELECT c1,c2,c3 COUNT(DISTINCT c4) FROM t2 GROUP BY c1,c2,c3) t3
GROUP BY c1,c2,c3;
由于子查询里面有 count(distinct)操作,直接去 group by 将达不到业务目标。这时采用临时表消灭 count(distinct) 作业不但能解决倾斜问题,还能有效减少 jobs。
INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3;
SELECT c1,c2,c3,SUM(income),SUM(uv) FROM
(SELECT c1,c2,c3,income,0 AS uv FROM t1
UNION ALL
SELECT c1,c2,c3,0 AS income,1 AS uv FROM t2) t3
GROUP BY c1,c2,c3;
job 数是 2,减少一半,而且两次 MapReduce 比 count(distinct) 效率更高
- 消灭子查询内的 join
SELECT * FROM
(SELECT * FROM t1 UNION ALL SELECT * FROM t4 UNION ALL SELECT * FROM t2 JOIN t3 ON t2.id=t3.id) x
GROUP BY c1,c2;
上面代码运行会有 5 个 jobs。假如先 join 生成临时表 t5,然后 union all,会变成 2 个 jobs。
INSERT OVERWRITE TABLE t5
SELECT * FROM t2 JOIN t3 ON t2.id=t3.id;
SELECT * FROM (t1 UNION ALL t4 UNION ALL t5);
4.6 group by 替代 count(distinct)达到优化效果
计算 uv 的时候,经常会用 count(distinct),但在数据比较倾斜的时候 count(distinct) 会比较慢,此时可以尝试用 group by 改写代码计算 uv。
原有代码:
SELECT 20190329 AS thedate,adzoneid,COUNT(DISTINCT acookie) AS uv FROM s_ods_log_tanx_pv t
WHERE t.ds=20190329 GROUP BY adzoneid
关于 count(distinct)的数据倾斜问题不能一概而论,要依情况而定。
SELECT COUNT(1) FROM
(SELECT DISTINCT acookie FROM s_ods_log_tanx_pv where ds = 20120329) tmp
5 优化的常用手段
5.1 主要由三个属性来决定
- hive.exec.reducers.bytes.per.reducer:这个参数控制一个 job 会有多少个 reducer 来处理,依据的是输入文件的总大小。默认1GB
- hive.exec.reducers.max:这个参数控制最大的 reducer 的数量,如果 input.bytes.per.reduce > max 则会启动这个参数所指定的 reduce 个数。这个并不会影响 mapre.reduce.tasks 参数的设置。默认的 max 是 999。
- mapred.reduce.tasks:这个参数如果指定了,hive就不会用它的 estimation 函数来自动计算 reduce 的个数,而是用这个参数来启动 reducer。默认是 -1。
5.2 参数设置的影响
- reducer 太少:如果数据量很大,会导致这个 reduce 异常的慢,从而导致这个任务不能结束,也有可能会 OOM
- reducer 太多:产生的小文件太多,合并起来代价太高,namenode的内存占用也会增大
如果不指定mapred.reduce.tasks,hive会自动计算需要多少个 reducer。