欢迎投稿

今日深度:

Programming Hive ( Hive编程指南) 五,

Programming Hive ( Hive编程指南) 五,


6.8 类型转换

cast函数:STRING如何转FLOAT

--先将salary转换为float类型,然后再比较大小
        --cast(value as type)   如果 value不合法,Hive返回NULL
select name,salary from employees
where cast(salary as float) < 100000.0;
    --float转换为int,使用round()或者floor()函数

转换Binary值 

--转换BINARY值
        --Hive0.8中只支持Binary与STring互转,如果确定Binary是数值,可以使用cast()嵌套转换
select (2.0 * cast(cast(b as string) as double)) from src;

6.9 抽样查询

需要有代表性的查询结果而不是全部结果,Hive可以通过对表分桶抽样满足这个需求

--numbers表只有number字段,值为1-10;
        --使用rand函数进行抽样,这个函数会返回一个随机数
select * from numbers tablesample(bucket 3 out of 10 on rand()) s;
2
4
--使用rand函数每次抽取结果不一致
select * from numbers tablesample(bucket 3 out of 10 on number) s;
2
--按照指定列,每次执行同一语句返回结果一致
select * from numbers tablesample(bucket 1 out of 2 on number) s;
2
4
6
8
10
select * from numbers tablesample(bucket 2 out of 2 on number) s;
1
3
5
7
9
--分母表示数据会被散列的桶的个数,分子表示将会选择的桶的个数

 

6.9.1 数据块抽样

--按照抽样百分比进行抽样,基于行数
select * from numbersflat tablesample(0.1 percent) s;
--抽样的最小单元式一个HDFS数据块,因此,如果表的数据大小小于普通的块大小128M,那么将会返回所有行

基于百分比的抽样方式提供了一个变量,用于控制基于数据块的调优的种子信息:

<property>
    <name>hive.sample.seednumber</name>
    <value>0<value>
    <description>A number used for pencentage sampling.By changing this number, user will change the subsets of data samples.</description>
</property>

 

6.9.2 分桶表的输入裁剪

TABLESAMPLE语句中指定的列和cluster by语句中指定的列相同,那么tablesample查询就只会扫描涉及到的表的分区下的数据

create table numbers_bucketed (number int) CLUSTERED BY (number) INTO 3 BUCKETS;

set hive.enfoorce.bucketing=true;
--开启bucket功能

insert overwrite table numbers_bucketed select number from numbers;
--在对应的数据库目录下就能查到对应的bucket文件:3个

dfs -cat /user/hive/warehouse/nyumbers_bucketed/000001_0;
1
7
10
4

select * from numbers_bucketed tablesample (BUCKET 2 out of 3 on number) s;
1
7
10
4

--查询文件与在Hive上查询结果一致

6.10  UNION ALL

SELECT log.ymd,log.level,log.message
    from (
        select 11.ymd,11.level,11.message,'Log1' as source
            from log1 ll
    UNION ALL
        select 12.ymd,12.level,12.message,'Log2' as source
            from log1 12 
) log
sort by log.ymd ASC;

--可以将2个或多个表进行合并,每一个union子查询都必须具有相同的列,且每个字段的字段类型必须一致

 

Chapter7      HiveQL:视图

Chapter8      HiveQL:索引

--无索引
create table employees (
    name    string,
    salary  float,
    subordinates    Array<String>,
    deductions    Map<string,float>,
    address    STRUCT<street:string,city:string,state:string,zip:int>
)
partitioned by (country string, state string);

--对分区字段country建立索引
create index employees_index
on table employees (country)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
with DEFERRED REBUILD
idxproperties ('creator'='me','created_at'='some_time')
IN TABLE employees_index_table
partitioned by (country,name)
comment 'Employees indexed by country and name'

Bitmap索引

create index employees_index
on table employees (country)
AS 'BITMAP'
with DEFERRED REBUILD
idxproperties ('creator'='me','created_at'='some_time')
IN TABLE employees_index_table
partitioned by (country,name)
comment 'Employees indexed by country and name'

--BITMAP普遍应用于排重后值较少的列

8.3 显示索引

show formatted index on employees;
--显示索引

8.4删除索引

drop index if exists employees_index on table employees;

 

chapter 9    模式设计

9.1 按天划分的表

--每天建一个表,太麻烦
create table supply_2011_01_02 (id int,part string,quantity int);
create table supply_2011_01_03 (id int,part string,quantity int);
create table supply_2011_01_04 (id int,part string,quantity int);

...load data ..

select part, quantity from supply_2011_01_02
    union all
select part,quantity from supply_2011_01_03
where quantity < 4;

优化:在表中添加分区

create table supply (id int,part string,quantity int) partitioned by (day int);

--添加分区
alter table supply add partition (day=20110102)
alter table supply add partition (day=20110103)
alter table supply add partition (day=20110104)

--按照分区查找
select part, quantity from supply
where day <= 20110102 and day < 20110103 and quantity < 4;

9.2 关于分区:虽然有可能优化查询,但是也可能产生不利

create table weblogs (url string,time long)
partitioned by (day int,state string,city string);

select * from weblogs where day=20110102;

--HDFS用来存储数百万的大文件,而非数十亿的小文件,过多分区会导致创建大量非必须的hadoop文件和文件夹,一个分区对应一个包含多个文件的文件夹
--MR将一个任务(job)转换成多个任务(task),每个task都是一个JVM实例,需要开启和销毁的开销
    --每个文件对应一个task,有时候,JVM开启和销毁的时间比处理数据的时间还要长

解决方法:

1.按照时间粒度来确定合适大小的数据积累量,而且安装这个时间粒度。随着时间推移,保证分区的数据量的增长是均匀的,而且分区下包含的文件大小是系统中块的大小或块大小的倍数。

2.使用两个级别的分区并且使用不同的维度

--一级分区按照天划分,二级分区按照州名划分
create table weblogs (url string,time long,city string)
partitioned by (day int,state string);

select * from weblogs where day = 20110102;

--但是有的state的数据也会偏多,导致map task时间过长,如果不能找到好的,大小相对合适的分区方式的话,可以考虑使用9.6节的分桶存储

9.3 唯一键和标准化

Hive没有主键或者基于序列秘钥生成的自增键的概念

 

9.4 同一份数据多种处理

HiveQL可以从一个数据源产生多个数据聚合,而无需每次聚合都要重新扫描一次

--都从源表中读取数据,然后倒入到两个表中
insert overwrite table sales
select * from history where action = 'purchased';

insert overwrite table credits
select * from history where action = 'returned';
--语法正确,但是效率低下,需要查询两次源表

优化方案:只需查询一次源表

from history
insert overwrite sales select * where action = 'purchases';
insert overwrite credits select * where action = 'returned';
--只查询一次源表

9.5 对于每个区的分表

insert overwrite table distinct_ip_in_logs
select distinct(ip) as ip from weblogs
where hit_date = 2011-01-01


create table state_city_for_day (state string,city string);

insert overwrite state_city_for_day
select distinct(state,city) from distinct_ip_in_logs
JOIN geodata ON (distinct_ip_in_logs.ip=geodata.ip);

--虽然是有效的,但是当计算某一天的数据时会导致前一天的数据被insert overwrite语句覆盖掉

解决办法:在整个过程中使用分区(还能允许用户对中间数据按照日期进行比较)

insert overwrite table distinct_ip_in_logs
partition (hit_date = 2011-01-01)
select distinct(ip) as ip from weblogs
where hit_date = 2011-01-01

create table state_city_for_day (state string,city string)
partitioned by (hit_date string);

insert overwrite table state_city_for_day partition(dt = 2011-01-01)
select distinct(state,city) from distinct_ip_in_logs
join geodata on (distinct_ip_in_logs.ip=geodata.ip)
where (hit_date=2011-01-01)
--有一个缺陷:用户需管理中间表并删除旧分区

9.6 分桶表数据存储

--一级分区是dt,二级分区是user_id,可能会有太多的小分区。用户使用动态分区创建,Hive默认会限制动态分区可以创建的最大分区数,用来避免由于创建太多的分区导致超过了文件系统的处理能力,因此,以下命令可能会执行失败。

create table weblog (url string,source_ip string)
partition by (dt string,user_id int);


from raw_weblog
insert overwrite table page_view partition(dt='2012-06-08',user_id)
select server_name,url,source_ip,dt,user_id;

对weblog进行分桶,使用user_id字段作为分桶字段,则字段值会根据用户指定的值进行哈希分到桶中,同一个user_id会存到同一个桶内,假设用户远大于桶数,那么每个桶内就会包含多个用户的记录:

create table weblog (user_id int,url string,source_ip string)
partitioned by (dt string)
clustered by (user_id) into 96 buckets;

--创建表仅定义元数据,不影响实际填充表的命令

--insert时,需要先设置一个属性强制Hive为目标表的分桶初始化过程设置一个正确的reducer个数,然后再执行查询来填充分区

set hive.enforce.bucketing= true;

from raw_logs
insert overwrite table weblog
partition (dt='2009-02-05')
select user_id,url,source_ip where dt = '2009-02-05';

9.7 为表增加列

create table weblogs (version long, url string)
partitioned by (hit_date int)
row format delimited fields terminated by '\t';


cat log1.txt
1 /mystuff
1 /toys

--加载数据
load data local inpath 'logq.txt' int weblogs partition(20110101);

--读取数据
select * from weblogs;
1 /mystuff 20110101
1 /toys 20110101


--增加字段
cat log2.txt
2 /cars bob
2 /stuff terry


alter table weblogs add columns (user_id string);


--加载数据
load data local inpath 'log2.txt' int weblogs partition(20110102);


select * from weblogs;
1 /mystuff 20110101
1 /toys 20110101
2 /cars 20110102 bob
2 /stuff 20110102 terry

9.8 使用列表存储

select distinct(state) from weblogs;
NY
NJ

cHAPTER 10 调优

10.3 限制调整

limit语句是查询所有数据,再返回部分结果,Hive有一个配置属性可以开启当使用limit语句时,对于元数据进行抽样

select
<prorerty>
    <name>hive.limit.optimize.enable</name>
    <value>true</value>
    <description>Whether to enable to optimization to try a smaller subset of data for simple LIMIT first.</description>
</prorerty>

--一旦hive.limit.optimize.enable设置为true,还有两个参数可以控制这个操作,即hive.limit.row.max.size和hive.limit.optiomize.file:

<prorerty>
    <name>hive.limit.row.max.size</name>
    <value>100000</value>
    <description>when trying a smaller subset of data for simple LIMIT,how much size we need to guarantee each row to have at least.</description>
</prorerty>

<prorerty>
    <name>hive.limit.optiomize.file</name>
    <value>10</value>
    <description>when trying a smaller subset of data for simple LIMIT,maximum number of files we can sample.</description>
</prorerty>


--缺点:输入的数据永远不会被处理到,比如任意的一个需要REDUCE的步骤查询,JOIN和GROUP BY等操作,一级聚合函数的大多数调用等等;

10.5 本地模式

Hive输入数据量非常小,查询触发执行任务的时间消耗可能比实际job的执行时间要多得多,对于此,Hive可以通过本地模式在单台机器上处理所有的任务。

set oldjobtracker=${hiveconf:mapred.job.tracker};
set mapred.job.tracker=local;
set mapred.tmp.dir=/home/edward/tmp;

select * from people where firstname = bob;
...

set mapred.job.tracker=${oldjobtracker};


--可以将hive.exec.model.local.auto的值设为true,增加到hive-site.xml文件中
<property>
    <name>hive.exec.mode.local.auto</name>
    <value>true</value>
    <description>
        ...
    </description>
</property>

 

10.6 并行执行

<property>
    <name>hive.exec.parallel</name>
    <value>true<value>
    <description>Whether to execute jobs in parallel</description>
</property>

10.7 严格模式

--可以禁止3类查询
hive.mapred.mode=strict;

--1.分区表:除非where语句中含有分区字段过滤条件来限制数据范围,否则不允许执行(用户不允许扫描所有分区):
    --原因:分区表通常有非常大的数据集,且数据增加迅速
select distinct(planner_id) from fracture_ins where planner_id=5;
FAILED:Error in semantic analysis: no partition Predicate Found for Alias "fracture_ins" Table "fracture_ins"

--增加分区过滤条件之后(限制表分区):
select distinct(planner_id) from fracture_ins
where planner_id=5 and hit_hate=20120101;
...normal results...


---------------------------------------------------------------------

--2.使用ORDER BY 语句,要求必须使用limit,因为order by为了执行排序过程会将所有结果数据发到同一个reducer中,强制要求用户增加这个limit语句可以防止reducer额外执行很长一段时间:
select * from fracture_ins where hit_date > 2012 ORDER BY planner_id;
FAILED:...

--加上limit语句就可以解决:
select * from fracture_ins where hit_date > 2012 ORDER BY planner_id
limit 1000;
--输出结果


--3.笛卡尔积的查询
select * from fracture_act JOIN fracture_ads
where fracture_act.planner_id = fracture_ads.planner_id;
FAILED:...

--使用on语句
select * from fracture_act join fracture_ads
ON (fracture_act.planner_id = fracture_ads.planner_id);
...normal results...

10.8 调整mapper和reducer个数

--过多的maper和reducer会导致启动阶段、调度和运行job过程中产生过多的开销; 如果太少,就可能没有充分利用集群内在的并行性


--Hive查询具有reduce过程时,CLI控制台会打印出调优后的reducer个数。GROUP BY 需要reduce,很多查询会转换成只需要map阶段的任务:

select pixel_id,count from fracture_ins where hit_date=20120119
group by pixel_id;
Total MapReduce jobs = 1
...


---------------------------------

--Hive根据数据量大小确定reducer个数。
--hive.exec.reducers.bytes.per.reducer的默认值是1GB


--还可以将reducer的个数设置为固定值,Hive默认reducer个数是3个


----------------------------------------

--共享大集群处理大任务时,为了控制资源利用情况,hive.exec.reducers.max显得很重要。Hadoop提供的
--map和reduce个数(也称为插槽)是固定的,某个大job就可能使用完所有的槽,导致其他job无法继续
hive.exec.reducers.max
--上面配置可以控制job利用资源的情况,配置到hive-site.xml文件中,属性值计算公式为:(集群总reduce槽位个数 * 1.5) / (执行中的查询的平均个数)

--1.5是经验系数,用于防止未充分利用集群的情况

10.9 JVM重用:使得JVM实例在同一个job中重新使用N次 ,N的值可以在hadoop的mapred-site.xml文件中进行设置

select 
<property>
    <name>mapred.job.reuse.jvm.num.tasks</name>
    <value>10<value>
    <description>...</description>
</property>


--缺点:开启JVM重用将会一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放,
--如果某个不平衡的job中有几个reduce task执行的时间要比其他reduce task消耗时间多得多的话,那么保留的插槽就会一直空闲着
--无法被其他的job使用,知道所有的task都结束了才会释放

10.10 索引:用来加快含有GROUP BY的语句查询的计算速度

10.11 动态分区调整

动态分区insert语句可以通过简单的select语句向分区表中创建很多新的分区

Hive可以通过配置限制动态分区插入允许雄黄间的分区数在1000个左右

在hiv-site.xml配置文件设置动态分区模式为严格模式(属性值为strict),然后增加相关属性信息,通过如下属性来限制查询可以创建的最大动态分区个数,P145

还可以配置设置DataNode上一次打开文件个数,这个设置必须在DataNode的hdfs-site.xml配置文件中。

10.12 推测执行 P146

10.13 单个MR中多个group by

试图将查询中的多个group by组装到单个MR任务中,想启动这个优化,需要一组常用的GROUP BY键:

<property>
    <name>hive.multigroupby.singlemr</name>
    <value>false</value>
    <description>...</description>
</property>

10.14 虚拟列

P148

www.htsjk.Com true http://www.htsjk.com/hive/32294.html NewsArticle Programming Hive ( Hive编程指南) 五, 6.8 类型转换 cast函数:STRING如何转FLOAT --先将salary转换为float类型,然后再比较大小 --cast(value as type) 如果 value不合法,Hive返回NULLselect name,salary from em...
相关文章
    暂无相关文章
评论暂时关闭