hive SQL优化之distribute by和sort by,hivedistribute
最近在优化hiveSQL,下面是一段排序,分组后取每组第一行记录的SQL
-
INSERT
OVERWRITE pt='${SRCTIME}')TABLE t_wa_funnel_distinct_temp PARTITION ( -
SELECT
-
bussiness_id, -
cookie_id, -
session_id, -
funnel_id, -
group_first(funnel_name) funnel_name, -
step_id, -
group_first(step_name) step_name, -
group_first(log_type) log_type, -
group_first(url_pattern) url_pattern, -
group_first(url) url, -
group_first(refer) refer, -
group_first(log_time) log_time, -
group_first(is_new_visitor) is_new_visitor, -
group_first(is_mobile_traffic) is_mobile_traffic, -
group_first(is_bounce) is_bounce, -
group_first(campaign_name) campaign_name, -
group_first(group_name) group_name, -
group_first(slot_name) slot_name, -
group_first(source_type) source_type, -
group_first(next_page) next_page, -
group_first(continent) continent, -
group_first(sub_continent_region) sub_continent_region, -
group_first(country) country, -
group_first(region) region, -
group_first(city) city, -
group_first(language) language, -
group_first(browser) browser, -
group_first(os) os, -
group_first(screen_color) screen_color, -
group_first(screen_resolution) screen_resolution, -
group_first(flash_version) flash_version, -
group_first(java) java, -
group_first(host) host -
FROM
-
(
SELECT* -
FROM r_wa_funnel -
WHERE pt='${SRCTIME}' -
ORDER BY bussiness_id, cookie_id, session_id, funnel_id, step_id, log_time ASC -
)
t1 -
GROUP
BY pt, bussiness_id, cookie_id, session_id, funnel_id, step_id;
group_first: 自定义函数,用户取每组第一个字段
${SRCTIME}:
由外部oozie调度传入, 作为时间分区,精确到小时.eg: 2011.11.01.21
下面在hive上以SRCTIME = 2011.11.01.21
执行以上SQL. 2011.11.01.21小时分区记录数有10435486
执行时间:
从上面可以看出,reduce阶段只有一个reduce, 这是因为ORDER
从业务需求来看, 只要按
OK, 这样可以采用hive提供的distribute by 和 sort by,这样可以充分利用hadoop资源, 在多个
reduce中局部按log_time 排序
优化有的hive代码:
-
INSERT
OVERWRITE pt='2011.11.01.21')TABLE t_wa_funnel_distinct PARTITION ( -
SELECT
-
bussiness_id, -
cookie_id, -
session_id, -
funnel_id, -
group_first(funnel_name) funnel_name, -
step_id, -
group_first(step_name) step_name, -
group_first(log_type) log_type, -
group_first(url_pattern) url_pattern, -
group_first(url) url, -
group_first(refer) refer, -
group_first(log_time) log_time, -
group_first(is_new_visitor) is_new_visitor, -
group_first(is_mobile_traffic) is_mobile_traffic, -
group_first(is_bounce) is_bounce, -
group_first(campaign_name) campaign_name, -
group_first(group_name) group_name, -
group_first(slot_name) slot_name, -
group_first(source_type) source_type, -
group_first(next_page) next_page, -
group_first(continent) continent, -
group_first(sub_continent_region) sub_continent_region, -
group_first(country) country, -
group_first(region) region, -
group_first(city) city, -
group_first(language) language, -
group_first(browser) browser, -
group_first(os) os, -
group_first(screen_color) screen_color, -
group_first(screen_resolution) screen_resolution, -
group_first(flash_version) flash_version, -
group_first(java) java, -
group_first(host) host -
FROM
-
(
SELECT* -
FROM r_wa_funnel -
WHERE pt='2011.11.01.21' -
distribute by bussiness_id, cookie_id, session_id, funnel_id, step_id sort by log_time ASC -
)
t1 -
GROUP
BY bussiness_id, cookie_id, session_id, funnel_id, step_id;
执行时间:
第一个需要执行6:43, 而优化有只要执行0:35秒,性能得到大幅提升select orderid,fenjian,timee
from
(
select orderid,fenjian,timee,row_number(orderid,fenjian) rn
from (
select orderid,fenjian,timee from tableName
distribute by orderid,fenjian sort by orderid,fenjian,timee asc
) t1
) t2
where t2.rn=1
select a.id,a.info,b.num from a join b on a.id=b.id and where b.num>=10
两个表做关联,首先where会过滤掉不需要的数据。
至于表怎么做map和reduce操作,在hive里的表是虚拟的,其实还是对hdfs文件进行操作,你可以在hdfs:///user/hive/warehouse路径下找到以表名来命名的文件,里面就是表的内容,可以执行-cat命令查看。所以,它的map操作很简单,就是按行读文件,然后会根据hive的默认分隔符\001对每行进行切分。切分完成后就会按照你SQL指定的逻辑进行合并,最后再输出成hdfs文件,只不过在hive里面看它是以表的形式展现的。
job数会在你执行sql语句之后紧接着有相应的日志记录,
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 2
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
这样就是有两个job,正在执行第一个job。
Hadoop job information for Stage-1: number of mappers: 5; number of reducers: 2
而这个就会告诉你有多少个mapper和reducer。
像你写的这个sql有join操作,而且是hiveSQL里面最普通的join,那么一定会有reducer参与,如果数据量很大,比如上千万条记录,join就会特别慢,job进度就会一直卡在reduce操作。可以改成mapjoin或者sort merge bucket mapjoin。
其实hive效率不高,不适合实时查询,即使一个表为空,用hive进行查询也会很耗时,因为它要把sql语句翻译成MR任务。虽然简化了分布式编程,但是效率上就会付出代价。
你的这句sql应该会翻译成一个JOB来执行,就是简单地map和reduce。
mapreduce就是按行读文件,然后切分,合并,输出成文件。