Hive实战下,
实践一:Bucket
数据集:create_rating_table_b.sql(创建bucket)
create external table rating_table_b
(userId INT,
movieId STRING,
rating STRING
)
clustered by (userId) into 32 buckets;
创建userid movieid, rating三个字段
clustered by (userid) into 32buckets :按userid做32个分库,用userid除32取模,定位到reduce
创表:
hive -f create_rating_table_b.sql
查看数据表:
hive> show tables;
OK
movie_table
rating_table
rating_table_b
rating_table_p
Time taken: 0.042 seconds, Fetched: 4 row(s)
hive> desc rating_table_b;
OK
userid int
movieid string
rating string
Time taken: 0.133 seconds, Fetched: 3 row(s)
hive> desc formatted rating_table_b;
OK
# col_name data_type comment
userid int
movieid string
rating string
# Detailed Table Information
Database: default
Owner: root
CreateTime: Sun May 26 15:29:30 CST 2019
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: hdfs://master:9000/user/hive/warehouse/rating_table_b
Table Type: EXTERNAL_TABLE
Table Parameters:
EXTERNAL TRUE
transient_lastDdlTime 1558855770
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: 32
Bucket Columns: [userid]
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.117 seconds, Fetched: 29 row(s)
打开bucket开关:
hive> set hive.enforce.bucketing=true;
此时表格为空:
hive> select * from rating_table_b;
OK
Time taken: 0.112 seconds
插数据
hive> from rating_table
> insert overwrite table rating_table_b
> select userid,movieid,rating;
从现有表的数据插入rating_table_b
检查bucket:(有32个文件)
[root@master 06.bucket]# hadoop fs -ls /user/hive/warehouse/rating_table_b
Found 32 items
-rwxr-xr-x 3 root supergroup 33490 2019-05-26 15:36 /user/hive/warehouse/rating_table_b/000000_0
-rwxr-xr-x 3 root supergroup 33157 2019-05-26 15:36 /user/hive/warehouse/rating_table_b/000001_0
-rwxr-xr-x 3 root supergroup 26593 2019-05-26 15:36 /user/hive/warehouse/rating_table_b/000002_0
...
-rwxr-xr-x 3 root supergroup 38325 2019-05-26 15:37 /user/hive/warehouse/rating_table_b/000030_0
-rwxr-xr-x 3 root supergroup 33348 2019-05-26 15:37 /user/hive/warehouse/rating_table_b/000031_0
查看文件内容:随机取一个数检验
[root@master 06.bucket]# hadoop fs -text /user/hive/warehouse/rating_table_b/000021_0 | head -3
373 2114 3.0
373 2109 4.0
373 2108 4.0
373%32=21
采样:(32个库,抽取两个库,从第三个库开始抽取即2和18)
hive> select * from rating_table_b tablesample(bucket 2 out of 16 on userid) limit 2;
OK
33 2470 4.0
33 2507 1.0
Time taken: 0.074 seconds, Fetched: 2 row(s)
33%32=1 ==> 所以应该是下标为1的桶,即第2个桶
用户自定义函数:
UDF:UpperCase.java(小写变大写)
hive> add jar /usr/local/src/test3/hive/07.ud_func/hive-1.0-SNAPSHOT.jar;
Added [/usr/local/src/test3/hive/07.ud_func/hive-1.0-SNAPSHOT.jar] to class path
Added resources: [/usr/local/src/test3/hive/07.ud_func/hive-1.0-SNAPSHOT.jar]
创建函数:
hive> create temporary function upper_func as 'Uppercase';
使用函数并进行对比:
hive> select title ,upper_func(title) from movie_table limit 3;
OK
title TITLE
Toy Story (1995) TOY STORY (1995)
Jumanji (1995) JUMANJI (1995)
Time taken: 0.206 seconds, Fetched: 3 row(s)
UDAF
UDTF
数据:hive_udtf.input.data
数据结构:token:score用;分割
需求:input:1:0.1;2:0.2
output:1 0.1
2 0.2
Expolde.java(产生jar包)
hive> add jar /usr/local/src/test3/hive/07.ud_func/udtf/hive-1.0-SNAPSHOT.jar;
Added [/usr/local/src/test3/hive/07.ud_func/udtf/hive-1.0-SNAPSHOT.jar] to class path
Added resources: [/usr/local/src/test3/hive/07.ud_func/udtf/hive-1.0-SNAPSHOT.jar]
creat_udtf_table.sql
create external table udtf_test_table
(data STRING
)
row format delimited fields terminated by '\t'
stored as textfile
location '/hive_udtf_dir';
(py27tf) [root@master udtf]# vim create_udtf_table.sql
create external table udtf_test_table
(data STRING
)
row format delimited fields terminated by '\t'
stored as textfile
location '/hive/hive_udtf_dir';
创建表:
hive -f create_udtf_table.sql
创建函数:
hive> create temporary function explode_func as 'Expolde';
上传数据:
hadoop fs -put hive_udtf.input.data /hive/hive_udtf_dir
使用函数:
hive> select explode_func(data) from udtf_test_table limit 3;
OK
1 0.1
2 0.2
3 0.3
shell实现udtf
transform: transform.awk
{
print $1'_'$2
}
与下面命令效果一样:
head movies.csv | awk -F',' '{print $1"_"$2}'
添加文件,使用transform函数
hive> add file /usr/local/src/test3/hive/08.transform/transform.awk;
Added resources: [/usr/local/src/test3/hive/08.transform/transform.awk]
hive> select transform(movieid, title) using "awk -f transform.awk" as (uuu) from movie_table limit 3;
...
Total MapReduce CPU Time Spent: 2 seconds 680 msec
OK
movieId_title
1_Toy
2_Jumanji
Time taken: 86.935 seconds, Fetched: 10 row(s)
python实现udtf
transform.py
hive> add file /usr/local/src/test3/hive/08.transform/transform.py;
Added resources: [/usr/local/src/test3/hive/08.transform/transform.py]
hive> select transform(movieid, title) using "python transform.py" as (uuu) from movie_table limit 3;
...
Total MapReduce CPU Time Spent: 2 seconds 10 msec
OK
movieId_title
1_Toy Story (1995)
2_Jumanji (1995)
Time taken: 21.894 seconds, Fetched: 3 row(s)
wordcount:The_man_of.txt
创建表:docs文章,文章有很多行,str类型
hive> create table docs(line string);
OK
Time taken: 0.118 seconds
加载数据:
hive> load data local inpath '/usr/local/src/test3/hive/08.transform/The_Man_of_Property.txt' overwrite into table docs;
Loading data to table default.docs
Table default.docs stats: [numFiles=1, numRows=0, totalSize=632207, rawDataSize=0]
OK
Time taken: 0.465 seconds
查表
select * from docs limit 2;
创建第二张表:也可用cteate_word_count.sql
hive> create table word_count(word string, count int)
> row format delimited fields terminated by '\t';
从数据源读出来解析,结果放到word_count表
添加mapper.py和red.py文件
hive> add file /usr/local/src/test3/hive/08.transform/mapper.py;
Added resources: [/usr/local/src/test3/hive/08.transform/mapper.py]
hive> add file /usr/local/src/test3/hive/08.transform/red.py;
Added resources: [/usr/local/src/test3/hive/08.transform/red.py]
做word_count统计
select transform(wc.word, wc.count) using 'python red.py' as w, c
from(
select transform(line) using 'python mapper.py' as word, count from docs cluster by word) wc limit 100;
插入表:
hive> insert overwrite table word_count
> select transform(wc.word, wc.count) using 'python red.py' as w, c
> from(
> select transform(line) using 'python mapper.py' as word, count from docs cluster by word) wc;
查询词频:
select * from word_count where word='the';