Programming Hive ( Hive编程指南) 五,
6.8 类型转换
cast函数:STRING如何转FLOAT
--先将salary转换为float类型,然后再比较大小
--cast(value as type) 如果 value不合法,Hive返回NULL
select name,salary from employees
where cast(salary as float) < 100000.0;
--float转换为int,使用round()或者floor()函数
转换Binary值
--转换BINARY值
--Hive0.8中只支持Binary与STring互转,如果确定Binary是数值,可以使用cast()嵌套转换
select (2.0 * cast(cast(b as string) as double)) from src;
6.9 抽样查询
需要有代表性的查询结果而不是全部结果,Hive可以通过对表分桶抽样满足这个需求
--numbers表只有number字段,值为1-10;
--使用rand函数进行抽样,这个函数会返回一个随机数
select * from numbers tablesample(bucket 3 out of 10 on rand()) s;
2
4
--使用rand函数每次抽取结果不一致
select * from numbers tablesample(bucket 3 out of 10 on number) s;
2
--按照指定列,每次执行同一语句返回结果一致
select * from numbers tablesample(bucket 1 out of 2 on number) s;
2
4
6
8
10
select * from numbers tablesample(bucket 2 out of 2 on number) s;
1
3
5
7
9
--分母表示数据会被散列的桶的个数,分子表示将会选择的桶的个数
6.9.1 数据块抽样
--按照抽样百分比进行抽样,基于行数
select * from numbersflat tablesample(0.1 percent) s;
--抽样的最小单元式一个HDFS数据块,因此,如果表的数据大小小于普通的块大小128M,那么将会返回所有行
基于百分比的抽样方式提供了一个变量,用于控制基于数据块的调优的种子信息:
<property>
<name>hive.sample.seednumber</name>
<value>0<value>
<description>A number used for pencentage sampling.By changing this number, user will change the subsets of data samples.</description>
</property>
6.9.2 分桶表的输入裁剪
TABLESAMPLE语句中指定的列和cluster by语句中指定的列相同,那么tablesample查询就只会扫描涉及到的表的分区下的数据
create table numbers_bucketed (number int) CLUSTERED BY (number) INTO 3 BUCKETS;
set hive.enfoorce.bucketing=true;
--开启bucket功能
insert overwrite table numbers_bucketed select number from numbers;
--在对应的数据库目录下就能查到对应的bucket文件:3个
dfs -cat /user/hive/warehouse/nyumbers_bucketed/000001_0;
1
7
10
4
select * from numbers_bucketed tablesample (BUCKET 2 out of 3 on number) s;
1
7
10
4
--查询文件与在Hive上查询结果一致
6.10 UNION ALL
SELECT log.ymd,log.level,log.message
from (
select 11.ymd,11.level,11.message,'Log1' as source
from log1 ll
UNION ALL
select 12.ymd,12.level,12.message,'Log2' as source
from log1 12
) log
sort by log.ymd ASC;
--可以将2个或多个表进行合并,每一个union子查询都必须具有相同的列,且每个字段的字段类型必须一致
Chapter7 HiveQL:视图
Chapter8 HiveQL:索引
--无索引
create table employees (
name string,
salary float,
subordinates Array<String>,
deductions Map<string,float>,
address STRUCT<street:string,city:string,state:string,zip:int>
)
partitioned by (country string, state string);
--对分区字段country建立索引
create index employees_index
on table employees (country)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
with DEFERRED REBUILD
idxproperties ('creator'='me','created_at'='some_time')
IN TABLE employees_index_table
partitioned by (country,name)
comment 'Employees indexed by country and name'
Bitmap索引
create index employees_index
on table employees (country)
AS 'BITMAP'
with DEFERRED REBUILD
idxproperties ('creator'='me','created_at'='some_time')
IN TABLE employees_index_table
partitioned by (country,name)
comment 'Employees indexed by country and name'
--BITMAP普遍应用于排重后值较少的列
8.3 显示索引
show formatted index on employees;
--显示索引
8.4删除索引
drop index if exists employees_index on table employees;
chapter 9 模式设计
9.1 按天划分的表
--每天建一个表,太麻烦
create table supply_2011_01_02 (id int,part string,quantity int);
create table supply_2011_01_03 (id int,part string,quantity int);
create table supply_2011_01_04 (id int,part string,quantity int);
...load data ..
select part, quantity from supply_2011_01_02
union all
select part,quantity from supply_2011_01_03
where quantity < 4;
优化:在表中添加分区
create table supply (id int,part string,quantity int) partitioned by (day int);
--添加分区
alter table supply add partition (day=20110102)
alter table supply add partition (day=20110103)
alter table supply add partition (day=20110104)
--按照分区查找
select part, quantity from supply
where day <= 20110102 and day < 20110103 and quantity < 4;
9.2 关于分区:虽然有可能优化查询,但是也可能产生不利
create table weblogs (url string,time long)
partitioned by (day int,state string,city string);
select * from weblogs where day=20110102;
--HDFS用来存储数百万的大文件,而非数十亿的小文件,过多分区会导致创建大量非必须的hadoop文件和文件夹,一个分区对应一个包含多个文件的文件夹
--MR将一个任务(job)转换成多个任务(task),每个task都是一个JVM实例,需要开启和销毁的开销
--每个文件对应一个task,有时候,JVM开启和销毁的时间比处理数据的时间还要长
解决方法:
1.按照时间粒度来确定合适大小的数据积累量,而且安装这个时间粒度。随着时间推移,保证分区的数据量的增长是均匀的,而且分区下包含的文件大小是系统中块的大小或块大小的倍数。
2.使用两个级别的分区并且使用不同的维度
--一级分区按照天划分,二级分区按照州名划分
create table weblogs (url string,time long,city string)
partitioned by (day int,state string);
select * from weblogs where day = 20110102;
--但是有的state的数据也会偏多,导致map task时间过长,如果不能找到好的,大小相对合适的分区方式的话,可以考虑使用9.6节的分桶存储
9.3 唯一键和标准化
Hive没有主键或者基于序列秘钥生成的自增键的概念
9.4 同一份数据多种处理
HiveQL可以从一个数据源产生多个数据聚合,而无需每次聚合都要重新扫描一次
--都从源表中读取数据,然后倒入到两个表中
insert overwrite table sales
select * from history where action = 'purchased';
insert overwrite table credits
select * from history where action = 'returned';
--语法正确,但是效率低下,需要查询两次源表
优化方案:只需查询一次源表
from history
insert overwrite sales select * where action = 'purchases';
insert overwrite credits select * where action = 'returned';
--只查询一次源表
9.5 对于每个区的分表
insert overwrite table distinct_ip_in_logs
select distinct(ip) as ip from weblogs
where hit_date = 2011-01-01
create table state_city_for_day (state string,city string);
insert overwrite state_city_for_day
select distinct(state,city) from distinct_ip_in_logs
JOIN geodata ON (distinct_ip_in_logs.ip=geodata.ip);
--虽然是有效的,但是当计算某一天的数据时会导致前一天的数据被insert overwrite语句覆盖掉
解决办法:在整个过程中使用分区(还能允许用户对中间数据按照日期进行比较)
insert overwrite table distinct_ip_in_logs
partition (hit_date = 2011-01-01)
select distinct(ip) as ip from weblogs
where hit_date = 2011-01-01
create table state_city_for_day (state string,city string)
partitioned by (hit_date string);
insert overwrite table state_city_for_day partition(dt = 2011-01-01)
select distinct(state,city) from distinct_ip_in_logs
join geodata on (distinct_ip_in_logs.ip=geodata.ip)
where (hit_date=2011-01-01)
--有一个缺陷:用户需管理中间表并删除旧分区
9.6 分桶表数据存储
--一级分区是dt,二级分区是user_id,可能会有太多的小分区。用户使用动态分区创建,Hive默认会限制动态分区可以创建的最大分区数,用来避免由于创建太多的分区导致超过了文件系统的处理能力,因此,以下命令可能会执行失败。
create table weblog (url string,source_ip string)
partition by (dt string,user_id int);
from raw_weblog
insert overwrite table page_view partition(dt='2012-06-08',user_id)
select server_name,url,source_ip,dt,user_id;
对weblog进行分桶,使用user_id字段作为分桶字段,则字段值会根据用户指定的值进行哈希分到桶中,同一个user_id会存到同一个桶内,假设用户远大于桶数,那么每个桶内就会包含多个用户的记录:
create table weblog (user_id int,url string,source_ip string)
partitioned by (dt string)
clustered by (user_id) into 96 buckets;
--创建表仅定义元数据,不影响实际填充表的命令
--insert时,需要先设置一个属性强制Hive为目标表的分桶初始化过程设置一个正确的reducer个数,然后再执行查询来填充分区
set hive.enforce.bucketing= true;
from raw_logs
insert overwrite table weblog
partition (dt='2009-02-05')
select user_id,url,source_ip where dt = '2009-02-05';
9.7 为表增加列
create table weblogs (version long, url string)
partitioned by (hit_date int)
row format delimited fields terminated by '\t';
cat log1.txt
1 /mystuff
1 /toys
--加载数据
load data local inpath 'logq.txt' int weblogs partition(20110101);
--读取数据
select * from weblogs;
1 /mystuff 20110101
1 /toys 20110101
--增加字段
cat log2.txt
2 /cars bob
2 /stuff terry
alter table weblogs add columns (user_id string);
--加载数据
load data local inpath 'log2.txt' int weblogs partition(20110102);
select * from weblogs;
1 /mystuff 20110101
1 /toys 20110101
2 /cars 20110102 bob
2 /stuff 20110102 terry
9.8 使用列表存储
select distinct(state) from weblogs;
NY
NJ
cHAPTER 10 调优
10.3 限制调整
limit语句是查询所有数据,再返回部分结果,Hive有一个配置属性可以开启当使用limit语句时,对于元数据进行抽样
select
<prorerty>
<name>hive.limit.optimize.enable</name>
<value>true</value>
<description>Whether to enable to optimization to try a smaller subset of data for simple LIMIT first.</description>
</prorerty>
--一旦hive.limit.optimize.enable设置为true,还有两个参数可以控制这个操作,即hive.limit.row.max.size和hive.limit.optiomize.file:
<prorerty>
<name>hive.limit.row.max.size</name>
<value>100000</value>
<description>when trying a smaller subset of data for simple LIMIT,how much size we need to guarantee each row to have at least.</description>
</prorerty>
<prorerty>
<name>hive.limit.optiomize.file</name>
<value>10</value>
<description>when trying a smaller subset of data for simple LIMIT,maximum number of files we can sample.</description>
</prorerty>
--缺点:输入的数据永远不会被处理到,比如任意的一个需要REDUCE的步骤查询,JOIN和GROUP BY等操作,一级聚合函数的大多数调用等等;
10.5 本地模式
Hive输入数据量非常小,查询触发执行任务的时间消耗可能比实际job的执行时间要多得多,对于此,Hive可以通过本地模式在单台机器上处理所有的任务。
set oldjobtracker=${hiveconf:mapred.job.tracker};
set mapred.job.tracker=local;
set mapred.tmp.dir=/home/edward/tmp;
select * from people where firstname = bob;
...
set mapred.job.tracker=${oldjobtracker};
--可以将hive.exec.model.local.auto的值设为true,增加到hive-site.xml文件中
<property>
<name>hive.exec.mode.local.auto</name>
<value>true</value>
<description>
...
</description>
</property>
10.6 并行执行
<property>
<name>hive.exec.parallel</name>
<value>true<value>
<description>Whether to execute jobs in parallel</description>
</property>
10.7 严格模式
--可以禁止3类查询
hive.mapred.mode=strict;
--1.分区表:除非where语句中含有分区字段过滤条件来限制数据范围,否则不允许执行(用户不允许扫描所有分区):
--原因:分区表通常有非常大的数据集,且数据增加迅速
select distinct(planner_id) from fracture_ins where planner_id=5;
FAILED:Error in semantic analysis: no partition Predicate Found for Alias "fracture_ins" Table "fracture_ins"
--增加分区过滤条件之后(限制表分区):
select distinct(planner_id) from fracture_ins
where planner_id=5 and hit_hate=20120101;
...normal results...
---------------------------------------------------------------------
--2.使用ORDER BY 语句,要求必须使用limit,因为order by为了执行排序过程会将所有结果数据发到同一个reducer中,强制要求用户增加这个limit语句可以防止reducer额外执行很长一段时间:
select * from fracture_ins where hit_date > 2012 ORDER BY planner_id;
FAILED:...
--加上limit语句就可以解决:
select * from fracture_ins where hit_date > 2012 ORDER BY planner_id
limit 1000;
--输出结果
--3.笛卡尔积的查询
select * from fracture_act JOIN fracture_ads
where fracture_act.planner_id = fracture_ads.planner_id;
FAILED:...
--使用on语句
select * from fracture_act join fracture_ads
ON (fracture_act.planner_id = fracture_ads.planner_id);
...normal results...
10.8 调整mapper和reducer个数
--过多的maper和reducer会导致启动阶段、调度和运行job过程中产生过多的开销; 如果太少,就可能没有充分利用集群内在的并行性
--Hive查询具有reduce过程时,CLI控制台会打印出调优后的reducer个数。GROUP BY 需要reduce,很多查询会转换成只需要map阶段的任务:
select pixel_id,count from fracture_ins where hit_date=20120119
group by pixel_id;
Total MapReduce jobs = 1
...
---------------------------------
--Hive根据数据量大小确定reducer个数。
--hive.exec.reducers.bytes.per.reducer的默认值是1GB
--还可以将reducer的个数设置为固定值,Hive默认reducer个数是3个
----------------------------------------
--共享大集群处理大任务时,为了控制资源利用情况,hive.exec.reducers.max显得很重要。Hadoop提供的
--map和reduce个数(也称为插槽)是固定的,某个大job就可能使用完所有的槽,导致其他job无法继续
hive.exec.reducers.max
--上面配置可以控制job利用资源的情况,配置到hive-site.xml文件中,属性值计算公式为:(集群总reduce槽位个数 * 1.5) / (执行中的查询的平均个数)
--1.5是经验系数,用于防止未充分利用集群的情况
10.9 JVM重用:使得JVM实例在同一个job中重新使用N次 ,N的值可以在hadoop的mapred-site.xml文件中进行设置
select
<property>
<name>mapred.job.reuse.jvm.num.tasks</name>
<value>10<value>
<description>...</description>
</property>
--缺点:开启JVM重用将会一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放,
--如果某个不平衡的job中有几个reduce task执行的时间要比其他reduce task消耗时间多得多的话,那么保留的插槽就会一直空闲着
--无法被其他的job使用,知道所有的task都结束了才会释放
10.10 索引:用来加快含有GROUP BY的语句查询的计算速度
10.11 动态分区调整
动态分区insert语句可以通过简单的select语句向分区表中创建很多新的分区
Hive可以通过配置限制动态分区插入允许雄黄间的分区数在1000个左右
在hiv-site.xml配置文件设置动态分区模式为严格模式(属性值为strict),然后增加相关属性信息,通过如下属性来限制查询可以创建的最大动态分区个数,P145
还可以配置设置DataNode上一次打开文件个数,这个设置必须在DataNode的hdfs-site.xml配置文件中。
10.12 推测执行 P146
10.13 单个MR中多个group by
试图将查询中的多个group by组装到单个MR任务中,想启动这个优化,需要一组常用的GROUP BY键:
<property>
<name>hive.multigroupby.singlemr</name>
<value>false</value>
<description>...</description>
</property>
10.14 虚拟列
P148