大数据学习笔记,
背景:在产品迭代缓慢的时期,对数据现状及未来发展做了综合评估: 1.利用mysql来进行凌晨及每小时的中间表生成的计划任务,效率变低,已经逐渐满足不了敏捷化的迭代需求; 2.mysql埋点库的数据量变大,存储方式采用分表存储,极大地降低数据利用效率(需要整合一张大表); 3.未雨绸缪,即使目前的工作需求在不断优化解决方案的同时仍然可以解决,但是考虑到产品有爆发的潜质,必须提前做好应对大数据量存储及利用的准备工作。决定:团队在支持业务现状的数据需求的同时,自主搭建及完善大数据平台存储计算及利用的工作。
准备:1.硬件配备:前期采用阿里云服务器,配置选择基础计算型,后期灵活调整。(主x1:8C16G 200高速云盘,从x2:4C8G 200高速云盘)2.环境配备:为降低开发成本,提高开发效率,直接租用阿里EMR(具备hadoop及配套组件的生态系统镜像)2.人员配备:2人
架构:
开发:
1.数据由mysql迁移至hadoop:
- 组件
(2)数据同步工具使用dataX还是sqoop:首先比较两者优势,前者由阿里开源,以json文件格式配置,稳定性较高,局限性较强;后者为hadoop自带组件,传输效率高,参数丰富,命令行配置较为方便,但是要注意数据倾斜问题的存在。使用中,选择DataX同步业务数据,选择Sqoop同步埋点数据;
(3)实践过程中sqoop由于外网连接mysql数据库,网络IO瓶颈导致导数超时,切换成内网连接则正常,另外经过两种导数工具实践对比,datax在数据量相对较少时效率高,sqoop则在大数据量传输时优势比较大。使用DataX同步业务数据:
- 同步方案
主要涉及到增量数据和更新数据
增量数据:每次同步的时候会记录当次最大自增主键值,下次同步时自增主键起始记录值从上次记录的最大主键值开始。
更新数据:每次同步时会记录下当次同步的时间,下次同步时更新时间起始记录值从上次记录的时间开始。
另:无主键的表一律每次先删除然后全量同步。
实践:最开始想的是使用阿里的开源组件canal结合binlog来实时记录下新增,删除,更新的数据,因为这样会更全面保存每条数据的操作记录,比如查询某个时间节点的记录是什么样的,但是实现过程中发现canal并非实 时监控到数据库的变更,延迟比较严重(可能还没研究透),另外这样会有一个弊端,当canal服务器挂掉以后,挂掉的这期间数据可能存在丢失。然而使用上面的方案可以保证宕机情况下数据的断点续传,只是更新 数据保存的时间节点力度变 粗,影响几乎不存在。因为业务库的数据变化比较大,还会有新增表,在程序实现时,会自动检测是否有新增表,如果有新增表,会自动在hive上创建对应的表实现同步,而不需要人为去干涉。
埋点数据:
不会涉及到数据的变更,使用sqoop增量同步。
- datax同步
(2)配置json文件(示例)hive -e "
CREATE EXTERNAL TABLE `count_operation`(
`id` string,
`user_id` string,
`from_user_id` string,
`create_user_id` string,
`group_id`string,
`group_new_type` string,
`port` string,
`operation` string,
`remark`string,
`user_register_time` string,
`user_last_login_time` string,
`create_time` string
)
COMMENT 'maidian operate log'
PARTITIONED BY (
`dt` string,
`hour` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION
'/yinian/bigdata/count_operation'
"
(3)运行job作业{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["`id`","`jumpid`","`bannerurl`","`bannertime`","`remark`","`bannerstatus`","`create_time`","`update_time`"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://10.29.217.13:3306/yinian"
],
"table": [
"`activitibanner`"
]
}
],
"password": "******",
"username": "biuser6",
"where": "id>1"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":"`id`","type":"int"},{"name":"`jumpid`","type":"int"},{"name":"`bannerurl`","type":"string"},{"name":"`bannertime`","type":"string"},{"name":"`remark`","type":"string"},{"name":"`bannerstatus`","type":"int"},{"name":"`create_time`","type":"string"},{"name":"`update_time`","type":"string"}
],
"compress": "GZIP",
"defaultFS": "hdfs://emr-header-1.cluster-65705:9000",
"fieldDelimiter":",",
"fileName": "activitibanner",
"fileType": "text",
"path": "/user/hive/warehouse/yinian.db/activitibanner/day=20180626/hour=18",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "30"
}
}
}
}
python datax.py ../job/mysqlreader-hdfswriter.json
(4)挂载分区路径至hive
hive -e "alter table yinian_count.count_operation add if not exists partition (dt='${V_DT}',hour='${V_HOUR}');"
- 使用Sqoop同步埋点数据
(2)同步至hdfs文件hive -e "
CREATE EXTERNAL TABLE `count_operation`(
`id` string,
`user_id` string,
`from_user_id` string,
`create_user_id` string,
`group_id`string,
`group_new_type` string,
`port` string,
`operation` string,
`remark`string,
`user_register_time` string,
`user_last_login_time` string,
`create_time` string
)
COMMENT 'maidian operate log'
PARTITIONED BY (
`dt` string,
`hour` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION
'/yinian/bigdata/count_operation'
"
(3)挂载分区路径至hivesqoop import \
--connect jdbc:mysql://10.29.217.13:3306/yinian_count \
--username biuser \
--password '*******' \
--query "select * from count_operation where operation<>'pv' and
create_time>=FROM_UNIXTIME(${V_START},'%Y-%m-%d %H:%i:%S') and
create_time<FROM_UNIXTIME(${V_END},'%Y-%m-%d %H:%i:%S') and \$CONDITIONS" \
--target-dir /yinian/bigdata/count_operation/dt=${V_DT}/hour=${V_HOUR} \
--fields-terminated-by '\001' \
-m 1 \
--split-by 'id'
2.hive数据仓库建设:hive -e "alter table yinian_count.count_operation add if not exists partition (dt='${V_DT}',hour='${V_HOUR}');"
(1)hive sql 的写法及优化要注意,UDF函数的创建满足既定需求;
(2)这里的dw层实际当dm层在使用,dm层应该在hive上定义为过度表,用完即删;
hive -e "
insert into table yinian_dw.dw_bury_info
select regexp_replace(reflect('java.util.UUID', 'randomUUID'), '-', '')
,a.operation
,d.remark
,d.theme
,d.source
,a.group_new_type
,a.port
,a.user_count
,a.operation_count
,'operation'
,'小时'
,from_unixtime(unix_timestamp()-7200,'yyyy-MM-dd HH:00:00')
,current_timestamp()
from (select operation,group_new_type,port,count(distinct user_id) as user_count,count(id) as operation_count from yinian_count.count_operation where dt=from_unixtime(unix_timestamp()-7200,'yyyyMMdd') and hour=from_unixtime(unix_timestamp()-7200,'HH') group by operation,group_new_type,port) a
join yinian.dim_operation d on a.operation=d.operation and d.is_value=1;
"
3.数据由hive迁移至mysql,用于应用:
(1)注意字段分隔符要和建表的时候统一;
应用:sqoop export \
--connect jdbc:mysql://10.29.217.13:3306/dw_04_burypoint \
--username biuser \
--password '******' \
--table dw_bury_info \
--export-dir /user/hive/warehouse/yinian_dw.db/dw_bury_info \
--input-fields-terminated-by ',' \
--lines-terminated-by '\n' \
--update-key uuid \
--update-mode allowinsert;
使用python项目superset进行数据可视化(其中涉及到部分二开)
运维:1.定时任务:用python定时任务框架schedule运行shell或python脚本; (1)python执行命令行框架subprocess(python3)
subprocess.call
执行命令,返回状态码(命令正常执行返回0,报错则返回1)
ret1=subprocess.call("ifconfig")
ret2=subprocess.call("ipconfig") #python3.5不是这样,依然会抛出异常导致无法对ret2赋值
print(ret1) #0
print(ret2) #1
ret = subprocess.call(["ls", "-l"], shell=False) #shell为False的时候命令必须分开写
ret = subprocess.call("ls -l", shell=True)subprocess.check_call
执行命令,如果执行成功则返回状态码0,否则抛异常
subprocess.check_call(["ls", "-l"])
subprocess.check_call("exit 1", shell=True) subprocess.check_output
执行命令,如果执行成功则返回执行结果,否则抛异常
subprocess.Popen(...)
用于执行复杂的系统命令| 参数 | 注释 |
|---|---|
| args | shell命令,可以是字符串或者序列类型(如:list,元组) |
| bufsize | 指定缓冲。0 无缓冲,1 行缓冲,其他 缓冲区大小,负值 系统缓冲 |
| stdin, stdout, stderr | 分别表示程序的标准输入、输出、错误句柄 |
| preexec_fn | 只在Unix平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用 |
| close_sfs | 在windows平台下,如果close_fds被设置为True,则新创建的子进程将不会继承父进程的输入、输出、错误管道。所以不能将close_fds设置为True同时重定向子进程的标准输入、输出与错误(stdin, stdout, stderr)。 |
| shell | 同上 |
| cwd | 用于设置子进程的当前目录 |
| env | 用于指定子进程的环境变量。如果env = None,子进程的环境变量将从父进程中继承。 |
| universal_newlines | 不同系统的换行符不同,True -> 同意使用 \n |
| startupinfo | 只在windows下有效,将被传递给底层的CreateProcess()函数,用于设置子进程的一些属性,如:主窗口的外观,进程的优先级等等 |
| createionflags | 同上 |
(2)轻量级定时器schedule
import schedule 2 import time 3 4 def job(): 5 print("I'm working...") 6 7 schedule.every(10).minutes.do(job) 8 schedule.every().hour.do(job) 9 schedule.every().day.at("10:30").do(job) 10 schedule.every().monday.do(job) 11 schedule.every().wednesday.at("13:15").do(job) 12 13 while True: 14 schedule.run_pending() 15 time.sleep(1)
(3)python 脚本后台运行
nohup python3 mysql2hdfs_sche.py > schedule.log 2>&1 &
通过日志输出来做错误预警;
3.任务调度:
当任务量增多,采用zooie或Zeus做任务调度;
踩坑:
(1)用户创建文件目录,另一用户不能访问,需要修改hdfs文件所属用户等权限:
HDFS支持权限控制,但支持较弱。HDFS的设计是基于POSIX模型的,支持按用户、用户组、其他用户的读写执行控制权限。在linux命令行下,可以使用下面的命令修改文件的权限、文件所有者,文件所属组:
sudo addgroup Hadoop#添加一个hadoop组
sudo usermod -a -G hadoop larry#将当前用户加入到hadoop组
sudo gedit etc/sudoers#将hadoop组加入到sudoer
在root ALL=(ALL) ALL后 hadoop ALL=(ALL) ALL
修改hadoop目录的权限
sudo chown -R larry:hadoop /home/larry/hadoop<所有者:组 文件>
sudo chmod -R 755 /home/larry/hadoop
修改hdfs的权限
sudo bin/hadoop dfs -chmod -R 755 /
sudo bin/hadoop dfs -ls /
修改hdfs文件的所有者
sudo bin/hadoop fs -chown -R larry /
sudo bin/hadoop dfsadmin -safemode leave #解除hadoop的安全模式
hadoop fs -copyFromLocal <localsrc> URI#拷贝本地文件到hdfs
hadoop fs -cat file:///file3 /user/hadoop/file4#将路径指定文件的内容输出到stdout
hadoop fs -chgrp [-R] GROUP URI#改变文件的所属组
hadoop fs -chmod [-R] 755 URI#改变用户访问权限
hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ]#修改文件的所有者
hadoop fs -copyToLocal URI localdst#拷贝hdfs文件到本地
hadoop fs -cp URI [URI …] <dest>#拷贝hdfs文件到其它目录
hadoop fs -du URI [URI …]#显示目录中所有文件的大小
hadoop fs -getmerge <src> <localdst> [addnl]#合并文件到本地目录
(2)集群释放缓存echo 3 > /proc/sys/vm/drop_caches
(3)集群kill应用
yarn application -kill application_1520407159877_44880