数据仓库Hive——函数与Hive调优,
文章目录
- 五、函数
- 1.系统自带的函数
- 1.1 查看系统自带的函数
- 1.2 显示某一个自带函数的用法
- 1.3 详细显示自带的函数的用法
- 2.自定义函数
- 3.自定义UDF函数开发实例(toLowerCase())
- 3.1 环境搭建
- 3.2 书写代码,定义一个传入的参数
- 3.3 打包,带入测试环境
- 3.4 创建临时函数和java class进行关联
- 3.5 函数使用
- 六、企业级调优
- 1 Fetch抓取
- 2 本地模式
- 3 表的优化
- 3.1 小表,大表Join
- 需求:对比两种表互相join的效率
- 3.1.1创建大表,小表,join表
- 3.1.2 分别向大表和小表中导入数据
- 3.1.3 关闭mapjoin功能(默认打开)
- 3.1.4 执行小表join大表
- 3.1.5 执行大表join小表
- 3.2 MapJoin
- 3.2.1 MapJoin的适用环境
- 3.2.2 MapJoin的参数设置
- 3.2.3 重新对刚才的Join操作测试
- 3.2.4 MapJoin 原理图
- 3.3 Group By
- 3.3.1 开启Map端聚合参数
- 3.4 Count(Distinct) 去重统计
- 案例:
- 3.4.1 创建一张表
- 3.4.2 加载数据
- 3.4.3 设置reduce个数
- 3.4.4 执行去重查询
- 3.4.5 采用Group by去重id
五、函数
1.系统自带的函数
1.1 查看系统自带的函数
hive> show functions;
1.2 显示某一个自带函数的用法
hive> desc function upper;
OK
upper(str) - Returns str with all characters changed to uppercase
Time taken: 0.055 seconds, Fetched: 1 row(s)
1.3 详细显示自带的函数的用法
hive> desc function extended upper;
OK
upper(str) - Returns str with all characters changed to uppercase
Synonyms: ucase
Example:
> SELECT upper('Facebook') FROM src LIMIT 1;
'FACEBOOK'
Time taken: 0.07 seconds, Fetched: 5 row(s)
2.自定义函数
(1)Hive自带了一些函数,比如max/min,但是数量有限,自己可以通过自定义UDF来方便的进行扩展
(2)自定义函数分为三种
UDF 一进一出
UDAF 聚集函数
UDTF 一进多出
3.自定义UDF函数开发实例(toLowerCase())
3.1 环境搭建
创建一个Module,hive/lib的jar拷入Module的环境,也可以使用Maven
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.0</version>
</dependency>
3.2 书写代码,定义一个传入的参数
package udf;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class myudf extends UDF {
public Text evaluate(Text str){
if(str == null) return null;
if (StringUtils.isBlank(str.toString())) return null;
return new Text(str.toString().toLowerCase());
}
}
3.3 打包,带入测试环境
hive> add jar /home/centos01/modules/apache-hive-1.2.2-bin/iotmp/jar/Hadoop_Test-0.0.1-SNAPSHOT.jar;
Added [/home/centos01/modules/apache-hive-1.2.2-bin/iotmp/jar/Hadoop_Test-0.0.1-SNAPSHOT.jar] to class path
Added resources: [/home/centos01/modules/apache-hive-1.2.2-bin/iotmp/jar/Hadoop_Test-0.0.1-SNAPSHOT.jar]
3.4 创建临时函数和java class进行关联
hive> create temporary function udf_lower as "udf.myudf";
OK
Time taken: 1.095 seconds
3.5 函数使用
hive> select name,udf_lower(name) lowername from stu_stuck;
OK
Joe 21 joe 21
Hank 22 hank 22
Tab 41 tab 41
Iuh 27 iuh 27
Jkh 74 jkh 74
Ray 21 ray 21
Rank 47 rank 47
Poy 48 poy 48
Tuq 32 tuq 32
Qwe 52 qwe 52
Loop 10 loop 10
Joe 21 joe 21
Hank 22 hank 22
Tab 41 tab 41
Iuh 27 iuh 27
Jkh 74 jkh 74
Ray 21 ray 21
Rank 47 rank 47
Poy 48 poy 48
Tuq 32 tuq 32
Qwe 52 qwe 52
Loop 10 loop 10
Loop 10 loop 10
Qwe 52 qwe 52
Tuq 32 tuq 32
Poy 48 poy 48
Rank 47 rank 47
Ray 21 ray 21
Jkh 74 jkh 74
Iuh 27 iuh 27
Tab 41 tab 41
Hank 22 hank 22
Joe 21 joe 21
Time taken: 2.934 seconds, Fetched: 33 row(s)
六、企业级调优
1 Fetch抓取
Fetch抓取是指Hive中对某些情况的查询可以不用到MapReduce计算。例如
select * from stu;
Hive 可以简单的读取employee对应的存储目录下的文件,然后输出查询效果到控制台。
只需要把hive-default.xml中的hive.fetch.task.conversion的value改成more即可,这样在全局查找、字段查找、limit查找中都不走MapReduce。如果设置成none,一切查询都会执行MR操作
<property>
<name>hive.fetch.task.conversion</name>
<value>more</value>
<description>
Expects one of [none, minimal, more].
Some select queries can be converted to single FETCH task minimizing latency.
Currently the query should be single sourced not having any subquery and should not have
any aggregations or distincts (which incurs RS), lateral views and joins.
0. none : disable hive.fetch.task.conversion
1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
</description>
</property>
2 本地模式
大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据的,不过,有时Hive的输入数据量是非常小的,在这种情况下,为了查询触发执行任务,消耗的时间可能会比job实际执行之间要多,对于大多数这种情况,Hive可以通过本地模式,在单台机器上处理所有所有任务,对于小数据集,执行时间可以被缩短。
用户可以通过设定hive.exec.mode.local.auto为true,让Hive在适当的时候自动进行优化。
而hive.exec.mode.local.auto.inputbytes.max可以设定最大数据输入量,当小于value的时候就采用local mr,大于就是分布式 mr.hive.exec.mode.local.auto.input.files.max可以设置最大输入文件量,小于value的时候采用local mr,大于的时候就是用分布式mr。
采用本地模式主要是节约不必要浪费的时间。
<property>
<name>hive.exec.mode.local.auto</name>
<value>true</value>
<description>Let Hive determine whether to run in local mode automatically</description>
</property>
<property>
<name>hive.exec.mode.local.auto.inputbytes.max</name>
<value>134217728</value>
<description>When hive.exec.mode.local.auto is true, input bytes should less than this for local mode.</description>
</property>
<property>
<name>hive.exec.mode.local.auto.input.files.max</name>
<value>4</value>
<description>When hive.exec.mode.local.auto is true, the number of tasks should less than this for local mode.</description>
</property>
3 表的优化
3.1 小表,大表Join
将Key相对分散,并且数据量小的表放在join的左边,这样就可以有效减少内存溢出错误发生的几率;再进一步,可以使用Group让小的维度表(1000条以下数据)先进内存。在map端完成reduce。
实际测试发现:新版的hive已经对小表join大表和大表join小表进行了优化。小表放在左边或者右边已经没有明显区别。
接下来就来对上面的测试进行验证,分别对大表(284w)和小表(170w)互相join的速度进行一下比较
需求:对比两种表互相join的效率
3.1.1创建大表,小表,join表
hive> create table if not exists bigTable(id int,sex int,money bigint,star int)row format delimited fields terminated by ',';
OK
Time taken: 0.069 seconds
hive> create table if not exists smallTable(id int,sex int,money bigint,star int)row format delimited fields terminated by ',';
OK
Time taken: 1.342 seconds
hive> create table if not exists joinTable(id int,sex int,money bigint,star int)row format delimited fields terminated by ',';
OK
Time taken: 0.214 seconds
3.1.2 分别向大表和小表中导入数据
hive> load data local inpath '/home/centos01/data/user_profile_table.csv' into table bigTable;
Loading data to table default.bigtable
Table default.bigtable stats: [numFiles=1, totalSize=528507]
OK
Time taken: 1.626 seconds
hive> load data local inpath '/home/centos01/data/user_profile_table_small.csv' into table smallTable;
Loading data to table default.smalltable
Table default.smalltable stats: [numFiles=1, totalSize=879]
OK
Time taken: 0.192 seconds
3.1.3 关闭mapjoin功能(默认打开)
hive> set hive.auto.convert.join = false;
3.1.4 执行小表join大表
hive> insert overwrite table jointable select b.id,b.sex,b.money,b.star from smalltable s left join bigTable b on s.id=b.id;
Query ID = centos01_20190228134754_36c5ff19-24d9-4a02-9278-80f4859f284d
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-02-28 13:48:01,371 Stage-1 map = 0%, reduce = 0%
2019-02-28 13:48:02,383 Stage-1 map = 100%, reduce = 0%
2019-02-28 13:48:04,405 Stage-1 map = 100%, reduce = 100%
Ended Job = job_local1983401461_0001
Loading data to table default.jointable
Table default.jointable stats: [numFiles=1, numRows=48, totalSize=831, rawDataSize=783]
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 1587279 HDFS Write: 1589063 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 9.991 seconds
3.1.5 执行大表join小表
hive> insert overwrite table jointable select b.id,b.sex,b.money,b.star from smalltable s right join bigTable b on s.id=b.id;
Query ID = centos01_20190228135406_e1cdaa32-9b5b-44ea-b449-41d577544e50
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-02-28 13:54:08,681 Stage-1 map = 100%, reduce = 0%
2019-02-28 13:54:11,729 Stage-1 map = 100%, reduce = 100%
Ended Job = job_local630207992_0002
Loading data to table default.jointable
Table default.jointable stats: [numFiles=1, numRows=28041, totalSize=500466, rawDataSize=472425]
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 3175659 HDFS Write: 2091419 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 5.636 seconds
3.2 MapJoin
3.2.1 MapJoin的适用环境
MapJoin一般用于解决数据倾斜的问题,就比如说我们在做数据分析的时候,表中有10000条数据,其中5000条的key都是一样的,剩下的都是散乱均匀分布,那么就会造成很大的数据倾斜,我们为了避免这种数据倾斜的发生,这时候我们就要用上MapJoin,MapJoin会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率也会高很多。
另外,MapJoin还有一个适用环境,它不仅仅适用于判断中相等的条件,像3 .1中的s. id=b. id,,也适用于比如s. id>b. id,因为我们如果不适用MapJoin的话,在这里运行mapReduce会进行笛卡尔积,运行效率特别低,如果使用了MapJoin,就会大大提升速度。
总结:MapJoin适用于
1.关联操作中有一张表非常小,产生了由于key相同的数据倾斜
2.不等值的链接操作
3.2.2 MapJoin的参数设置
# 设置自动选择MapJoin
hive> set hive.auto.convert.join= true;
# 表大小的阀值(以25MB为界)
hive> set hive.mapjoin.smalltable.filesize = 25000000;
3.2.3 重新对刚才的Join操作测试
hive> insert overwrite table jointable select b.id,b.sex,b.money,b.star from smalltable s right join bigTable b on s.id=b.id;
Query ID = centos01_20190228145854_8d4a1211-1953-4df0-9bf3-5b5dfa929b80
Total jobs = 1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/centos01/modules/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/centos01/modules/hbase-1.3.1/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-02-28 14:59:09,223 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Execution log at: /tmp/centos01/centos01_20190228145854_8d4a1211-1953-4df0-9bf3-5b5dfa929b80.log
2019-02-28 14:59:13 Starting to launch local task to process map join; maximum memory = 477626368
2019-02-28 14:59:16 Dump the side-table for tag: 0 with group count: 48 into file: file:/tmp/centos01/c3d78c64-7c3e-4145-b87b-a2ea95f3b6fd/hive_2019-02-28_14-58-54_950_2207089608769530710-1/-local-10002/HashTable-Stage-4/MapJoin-mapfile00--.hashtable
2019-02-28 14:59:16 Uploaded 1 File to: file:/tmp/centos01/c3d78c64-7c3e-4145-b87b-a2ea95f3b6fd/hive_2019-02-28_14-58-54_950_2207089608769530710-1/-local-10002/HashTable-Stage-4/MapJoin-mapfile00--.hashtable (1223 bytes)
2019-02-28 14:59:16 End of local task; Time Taken: 3.269 sec.
Execution completed successfully
MapredLocal task succeeded
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there ’s no reduce operator
Job running in-process (local Hadoop)
2019-02-28 14:59:23,843 Stage-4 map = 0%, reduce = 0%
2019-02-28 14:59:24,846 Stage-4 map = 100%, reduce = 0%
Ended Job = job_local2131229300_0006
Loading data to table default.jointable
Table default.jointable stats: [numFiles=1, numRows=28041, totalSize=500466, rawDataSize=472425]
MapReduce Jobs Launched:
Stage-Stage-4: HDFS Read: 2617285 HDFS Write: 1531383 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 30.767 seconds
hive> insert overwrite table jointable select b.id,b.sex,b.money,b.star from smalltable s left join bigTable b on s.id=b.id;
Query ID = centos01_20190228145941_3e1c8c4e-ead8-4e4f-8f40-e60f0edb74cb
Total jobs = 1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/centos01/modules/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/centos01/modules/hbase-1.3.1/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-02-28 14:59:53,185 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Execution log at: /tmp/centos01/centos01_20190228145941_3e1c8c4e-ead8-4e4f-8f40-e60f0edb74cb.log
2019-02-28 14:59:54 Starting to launch local task to process map join; maximum memory = 477626368
2019-02-28 14:59:58 Dump the side-table for tag: 1 with group count: 28041 into file: file:/tmp/centos01/c3d78c64-7c3e-4145-b87b-a2ea95f3b6fd/hive_2019-02-28_14-59-41_278_4846168118688373703-1/-local-10002/HashTable-Stage-4/MapJoin-mapfile11--.hashtable
2019-02-28 14:59:58 Uploaded 1 File to: file:/tmp/centos01/c3d78c64-7c3e-4145-b87b-a2ea95f3b6fd/hive_2019-02-28_14-59-41_278_4846168118688373703-1/-local-10002/HashTable-Stage-4/MapJoin-mapfile11--.hashtable (760678 bytes)
2019-02-28 14:59:58 End of local task; Time Taken: 3.52 sec.
Execution completed successfully
MapredLocal task succeeded
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
2019-02-28 15:00:00,758 Stage-4 map = 100%, reduce = 0%
Ended Job = job_local733804635_0007
Loading data to table default.jointable
Table default.jointable stats: [numFiles=1, numRows=48, totalSize=831, rawDataSize=783]
MapReduce Jobs Launched:
Stage-Stage-4: HDFS Read: 2618244 HDFS Write: 1532288 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 19.796 seconds
3.2.4 MapJoin 原理图
3.3 Group By
平时我们写MapReduce的时候都会把map阶段的数据发给Reduce完成一个聚合操作,但是当一个key的数据很多的时候就会产生数据倾斜,所以我们可以先在Map阶段完成一部分的聚合操作,然后在Reduce端得出最终结果。
3.3.1 开启Map端聚合参数
# 设置在Map端进行聚合
hive> set hive.map.aggr=true;
# 在Map端进行聚合操作的数目
hive> set hive.groupby.mapaggr.checkinterval = 100000;
# 有数据倾斜的时候进行负载均衡
hive> set hive.groupby.skewindata = true;
"""
当选项设定为true,生成的查询计划会有两个MR Job,第一个MR Job里,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并且输出结果,
这样的目的是让相同的key分散到不同的Group中做到负载均衡
"""
3.4 Count(Distinct) 去重统计
去重统计需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会处理Job很困难,一般Count(Distinct)使用先Group By再Count 的方式替换。
案例:
3.4.1 创建一张表
hive> create table if not exists balance_table(user_id bigint,report_date bigint,tBalance bigint,yBalance bigint,total_purchase_amt bigint,direct_purchase_amt bigint,purchase_bal_amt float,purchase_bank_amt float,total_redeem_amt float,consume_amt float,transfer_amt float,tftobal_amt float,tftocard_amt float,share_amt float,category1 float,category2 float,category3 float,category4 float) row format delimited fields terminated by ',';
OK
Time taken: 0.237 seconds
3.4.2 加载数据
hive> load data local inpath '/home/centos01/data/user_balance_table.csv' into table balance_table;
Loading data to table default.balance_table
Table default.balance_table stats: [numFiles=1, totalSize=157760971]
OK
Time taken: 9.235 seconds
3.4.3 设置reduce个数
hive> set mapreduce.job.reduces=5;
3.4.4 执行去重查询
hive> select count(distinct user_id) from balance_table;
Query ID = centos01_20190228170321_938c3469-4ec3-40fe-b1e5-7709cdebbb81
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Defaulting to jobconf value of: 5
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-02-28 17:03:25,285 Stage-1 map = 0%, reduce = 0%
2019-02-28 17:03:44,716 Stage-1 map = 57%, reduce = 0%
2019-02-28 17:03:46,726 Stage-1 map = 100%, reduce = 0%
2019-02-28 17:03:47,729 Stage-1 map = 100%, reduce = 100%
Ended Job = job_local203175379_0008
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-02-28 17:03:50,212 Stage-2 map = 100%, reduce = 100%
Ended Job = job_local1809588331_0009
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 962300310 HDFS Write: 955759554 SUCCESS
Stage-Stage-2: HDFS Read: 320766770 HDFS Write: 318586518 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
28041
Time taken: 28.705 seconds, Fetched: 1 row(s)
3.4.5 采用Group by去重id
hive> select count(user_id) from (select user_id from balance_table group by user_id)a;
Query ID = centos01_20190228170632_67dc5bc0-33c0-45b4-ba62-36a16fa0041f
Total jobs = 3
Launching Job 1 out of 3
Number of reduce tasks not specified. Defaulting to jobconf value of: 5
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-02-28 17:06:33,376 Stage-1 map = 0%, reduce = 0%
2019-02-28 17:06:38,867 Stage-1 map = 57%, reduce = 0%
2019-02-28 17:06:39,877 Stage-1 map = 100%, reduce = 100%
Ended Job = job_local909823916_0010
Launching Job 2 out of 3
Number of reduce tasks not specified. Defaulting to jobconf value of: 5
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-02-28 17:06:41,624 Stage-2 map = 100%, reduce = 100%
Ended Job = job_local364978170_0011
Launching Job 3 out of 3
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2019-02-28 17:06:42,987 Stage-3 map = 100%, reduce = 100%
Ended Job = job_local484914606_0012
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 1908890712 HDFS Write: 955759554 SUCCESS
Stage-Stage-2: HDFS Read: 1908890712 HDFS Write: 955759554 SUCCESS
Stage-Stage-3: HDFS Read: 636296904 HDFS Write: 318586518 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
28041
Time taken: 10.947 seconds, Fetched: 1 row(s)
由此可得,在数据量大的时候group by绝对是值得的。