Hive II,
Hive
数据仓库。
OLAP(online analyze process)
hdfs
元数据关系型数据中。
Hive执行流程
cli交互driver
driver通过编译器进行编译(语法解析和语义解析)
编译器查询metastore进行编译,生成计划。
执行计划返回driver,driver提交执行引擎,
执行引擎再提交作业给hadoop,hadoop返回结果
直至client。
tool,hadoop mr.
技术
hive
hiveserver2
beeline:
分区表 :分区就是目录。
桶表 :桶是文件。
内部表 :删除全删
外部表 :只删除表结构,表结构在metastore中。
结构化数据 :
tinyint
smallint
int
bigint
float
double
decimal
array [,,,,]
struct {“”,12,}
named struct {“key1”:”v1”,”k2”,”v2”}
map {1:”“,,,,}
union {a:[]}
split()函数
explode()
炸裂函数,表生成函数。炸开的是array和map.
cross
优化手段
mapjoin
select /*+MAPJOIN()*/
创建表完整语法
CREATE TABLE employee
(
name string,
arr ARRAY<string>,
struc STRUCT<sex:string,age:int>,
map1 MAP<string,int>,
map2 MAP<string,ARRAY<string>>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|' //默认\001
COLLECTION ITEMS TERMINATED BY ',' //默认\002
MAP KEYS TERMINATED BY ':' //默认\003
LINES TERMINATED BY '\n' //行结束符
STORED AS TEXTFILE; //
Map端连接
//连接暗示/*+ MAPJOIN(employee) */
SELECT /*+ MAPJOIN(employee) */ c.* FROM custs c CROSS JOIN orders o WHERE c.id <> o.cid;
//通过设置自动map端连接转换,实现map连接
set hive.auto.convert.join=true
SELECT c.* FROM custs c CROSS JOIN orders o WHERE c.id <> o.cid;
load data local inpath //上传
load data inpath //移动
load命令可以向分区表加载数据,无法加入指定的桶中.
桶表使用insert into..
//在shell中直接执行hive命令
$>hive -e “select * from mydb.custs”
//在shell命令行下执行hive的脚本
$>hive -f hive.sql
//导出表数据,导出元数据和数据本身
$hive>export table mydb.custs to ‘/user/centos/custs.dat’
//order by,全局排序,使用一个reduce实现全排序,强烈推荐数量用limit配合使用。
//对于每个reduce只要查询limit的数量即可。
$hive>select * from custs limit 3 ;
//设置mapred模式为严格模式,
//1.order by时必须使用limit限制结果数量
//2.如果分区表,必须指定分区。
//sort by是指map端排序
//在每个reduce中按照指定字段排序(asc|desc)
//如果mapred.reduce.tasks=1,等价于order by
//order by时始终使用一个reduce
// 没有hive前缀,是hadoop属性
$hive>set mapred.reduce.tasks=2
//distribute by等价mr的分区过程,按照指定字段进行分区,
//按照哪个列分发,必须出现在select语句中。
$hive>select * from orders distribute by cid sort by prices desc ;
//cluster by是快捷方式,如果使用同一个字段进行distribute和sort,
//可以使用该方式。
$hive>select * from orders cluster by cid ;
排序总结
1.order by
全局排序
2.sort by
reduce内排序
3.distribute by
分区,决定记录按哪个字段分配到分区。
4.cluster by
distribute by x sort by x ;
//函数
//size()提取数据大小
$hive>select size(arr) from emp ;
//是否包含指定元素
$hive>select array_contains(arr,”xx”) from emp ;
//查看所有函数
show functions ;
//
desc formatted function array_contains ;
select current_database() ;
select current_user() ;
select current_date() ;
//case when == switch case
SELECT CASE WHEN length(name) <= 4 THEN ‘Short’ ELSE ‘Long’ END as xx FROM emp ;
//倒序字符串
select reverse(“12345”) ;
SELECT reverse(split(reverse(‘/home/user/employee.txt’),’/’)[0])
//创建数组对象
select array(1,1,1,2,3,4) ;
//collect_set()聚合函数,对结果集进行聚合,返回一个集合。
SELECT collect_set(work_place[0]) AS flat_workplace0 FROM employee;
//虚列,内置列
select INPUT__FILE__NAME from emp ;
//事务,hive 0.13.0之后完全支持行级acid事务处理。
//所有事务都是自动提交,并且存储文件只能是orc文件,而且只能在桶表中使用。
1.设置相关属性
SET hive.support.concurrency = true;
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on = true;
SET hive.compactor.worker.threads = 1;
2.显式事务命令:
SHOW TRANSACTIONS;
3.操作语法
INSERT INTO TABLE tablename [PARTITION (partcol1[=val1], partcol2[=val2]…)] VALUES values_row [, values_row …];
UPDATE tablename SET column = value [, column = value…] [WHERE expression]
DELETE FROM tablename [WHERE expression]
4.创建表时,使用桶表,orc文件格式,支持事务的属性
create table tx(id int ,name string , age int)
clustered by (id) into 2 buckets
stored as orc
TBLPROPERTIES(‘transactional’=’true’);
5.执行操作
insert into tx(id,name,age) values(1,’tom’,2) ;
update tx set name = ‘tomas’ where id = 1 ;
delete from tx where id =1 ;
面向行存储
结构数据。
select name from orders ;
磁盘寻址非线性。
面向列存储
线性的。
orc
数据聚合与采样
count()
sum()
avg()
max()
min()
//查询每个customer的订单数
select cid,count(*) from orders group by cid ;
//错,select字段必须出现在group by中。
select cid,orderno ,count(*) from group by cid ;
//去重集合
select cid,collect_set(price) from group by cid ;
//select中出现多个聚合函数
select cid,max(price),min(price) from group by cid ;
//coalesce
返回第一个非空参数
SELECT sum(coalesce(sex_age.age,0)) AS age_sum,
sum(if(sex_age.sex = 'Female',sex_age.age,0))
AS female_age_sum FROM employee;
//不允许嵌套聚合,一下语句错误
SELECT avg(count(*)) AS row_cnt ;
//如果使用count + distinct组合,mapred.reduce.tasks属性失效,使用
//使用一个reduce进行,类似于order by
//map端聚合,预聚合,消耗更多内存。默认false
set hive.map.aggr=true
高级聚合
GROUPING SETS.
group by + union all
//查询每个cust的订单数
select count(*) from orders group by cid ;
select count(*) from orders group by orderno ;
//group + union all
select count(*) from orders group by cid union all select count(*) from orders group by orderno ;
//group by :指定按照哪些字段分组,
//grouping sets : 以上字段集如何组合。
select count(*) from orders group by cid,orderno grouping sets(cid,orderno,()) ;
//
rollup
rollup扩展了group by,
rollup比grouping sets多了一层聚合(n + 1)。
GROUP BY a,b,c WITH ROLLUP
GROUP BY a,b,c GROUPING SETS ((a,b,c),(a,b),(a),())
select cid,orderno,count(*) GROUP BY cid,orderno GROUPING SETS ((cid,orderno),(cid),())
cube
扩展了grouping sets,做了各种条件的组合,不做排序。
//代金券
//vip
//6点
等价于
GROUP BY a,b,c GROUPING SETS ((a,b,c),(a,b),(b,c),(a,c),(a),(b),(c),())
聚合条件
having,用于在组内过滤。
//使用having
select cid , max(price) mx from orders group by cid having mx > 100.1 ;
//嵌套子查询
select t.cid , t.mx from (select cid , max(price) mx from orders group by cid) t where t.mx > 100.1 ;
分析函数
0.11之后支持的,扫描多个输入的行计算每行的结果。通常和OVER, PARTITION BY, ORDER BY,windowing
配合使用。和传统分组结果不一样,传统结果没组中只有一个结果(max)。
分析函数的结果会出现多次,和每条记录都连接输出。
Function (arg1,..., argn) OVER ([PARTITION BY <...>] [ORDER BY <....>] [<window_clause>])
SELECT name, dept_num, salary,
COUNT(*) OVER (PARTITION BY dept_num) AS row_cnt,
SUM(salary) OVER(PARTITION BY dept_num ORDER BY dept_num) AS deptTotal,
SUM(salary) OVER(ORDER BY dept_num) AS runningTotal1,
SUM(salary) OVER(ORDER BY dept_num, name rows unbounded preceding) AS runningTotal2
FROM employee_contract
ORDER BY dept_num, name;
//宏观使用cid排序整个数据集,在分区内按照id降序排列。
SELECT id, orderno, price,cid ,
COUNT(*) OVER (PARTITION BY cid) AS cnt ,
min(price) over (partition by orderno order by id desc) FROM orders ORDER BY cid;
//
SELECT id, orderno, price,cid ,
min(price) over (partition by orderno) FROM orders ORDER BY cid;
//order by每条记录内取.
SELECT id, orderno, price,cid ,
min(price) over (order by price desc) FROM orders ORDER BY cid;
//分区都是独立分区,不是嵌套再分区
SELECT id, orderno, price,cid ,
COUNT(*) OVER (PARTITION BY cid) AS cnt ,
min(price) over (partition by orderno) FROM orders ORDER BY cid;
//分区内排序
SELECT id, orderno, price,cid ,
min(price) over (partition by cid order by price desc) FROM orders ORDER BY cid;
//rank
SELECT id, orderno, price,cid ,
RANK() OVER (PARTITION BY cid ORDER BY price) FROM orders ORDER BY cid;
//dense_rank
SELECT id, orderno, price,cid ,
dense_rank() over (partition by cid) FROM orders ORDER BY cid;
//row_number()
SELECT id, orderno, price,cid ,
row_number() over (partition by cid) FROM orders ORDER BY cid;
//CUME_DIST:
//PERCENT_RANK
currow-1 / totalrow - 1
1: 1 - 1 / 3 - 1 = 0
2: 2 - 1 / 3 - 1 = 0.5
3: 3 - 1 / 3 - 1 = 1
//NTILE:
CREATE TABLE employee
(
name string,
dept_num int,
salary float
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
SELECT name, dept_num, salary,
RANK() OVER (PARTITION BY dept_num ORDER BY salary) AS rank,
DENSE_RANK() OVER (PARTITION BY dept_num ORDER BY salary) AS dense_rank,
ROW_NUMBER() OVER () AS row_num,
ROUND((CUME_DIST() OVER (PARTITION BY dept_num ORDER BY salary)), 2) AS cume_dist,
PERCENT_RANK() OVER(PARTITION BY dept_num ORDER BY salary) AS percent_rank,
NTILE(4) OVER(PARTITION BY dept_num ORDER BY salary) AS ntile
FROM employee ORDER BY dept_num;
CUME_DIST:累加分布
current row_num/ total rows,如果重复行,都取相同末尾行的行号。
例如:
1: 2 / 3 = 0.67
1: 2 / 3 = 0.67
2: 3 / 3 = 1
1: 1 / 3 = 0.33
2: 3 / 3 = 1
2: 3 / 3 = 1
1: 3 / 3 = 1
1: 3 / 3 = 1
1: 3 / 3 = 1
percent_rank
currentrow - 1 / totalrow - 1
类似于cume_dist,但是提取相同rank的首行行号。
1: 1 - 1 / 3 - 1 = 0
1: 1 - 1 / 3 - 1 = 0
2: 3 - 1 / 3 - 1 = 1
1: 1 - 1 / 3 - 1 = 0
2: 2 - 1 / 3 - 1 = 0.5
2: 2 - 1 / 3 - 1 = 0.5
NTile
对每条记录分配桶的编号,桶的个数.指定桶的数。