[Hadoop] Hadoop Streaming使用Python编程,
Hadoop Streaming 是Hadoop提供方的一个编程工具,它允许用户使用任何可执行文件或者脚本作为Mapper和Reducer。
Hadoop Streaming 多语言编程
1. 以标准输入流作为输入:
1) C++: cin
2) Php: stdin
3) Python: sys.stdin
2. 以标准的输出流作为输出:
1) C++: cout
2) Php: echo
3) Python: print
3. 可实现Mapper和Reducer,其他组件(InputFormat,Partitioner等需要用Java语言来实现)
Python实现Wordcount:
1. mapper.py
[root@vm wordcount]# vim mapper.py写入
#!/usr/bin/python
import sys
word2count = {}
for line in sys.stdin:
line = line.strip()
words = filter(lambda word:word,line.split())
for word in words:
print("%s\t%s" % (word,1))2. reducer.py
[root@vm wordcount]# vim reducer.py写入#!/usr/bin/python
from operator import itemgetter
import sys
word2count = {}
for line in sys.stdin:
line = line.strip()
word,count = line.split()
try:
count = int(count)
word2count[word] = word2count.get(word,0) + count
except ValueError as err:
print(err)
pass
sorted_word2count = sorted(word2count.items(),key=itemgetter(0))
for word,count in sorted_word2count:
print("%s\t%s" % (word, count))3. 准备一个测试文件test.txt
[root@vm wordcount]# vim test.txt写入this is a test
this is a test
this is a test
this is a test4. 本地测试
[root@vm wordcount]# cat test.txt |python mapper.py |sort|python reducer.py
a 4
is 4
test 4
this 4
[root@vm wordcount]#5. 集群运行
集群运行前要将本地的测试文件上传到hdfs[root@vm wordcount]# hadoop fs -mkdir /user/root/wordcount
[root@vm wordcount]# hadoop fs -put test.txt /user/root/wordcount/
[root@vm wordcount]# hadoop fs -ls /user/root/wordcount/
Found 1 items
-rw-r--r-- 3 root root 60 2018-05-14 09:58 /user/root/wordcount/test.txt
[root@vm wordcount]# 运行mapreduce
[root@vm wordcount]# hadoop jar /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/hadoop-streaming-2.6.0-cdh5.13.1.jar -D mapred.reduce.tasks=1 -mapper "python mapper.py" -reducer "python reducer.py" -file mapper.py -file reducer.py -input /user/root/wordcount/test.txt -output /user/root/wordcount/out
18/05/14 10:00:37 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.py, reducer.py] [/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/hadoop-streaming-2.6.0-cdh5.13.1.jar] /tmp/streamjob7327942722840197442.jar tmpDir=null
18/05/14 10:00:39 INFO mapred.FileInputFormat: Total input paths to process : 1
18/05/14 10:00:39 INFO mapreduce.JobSubmitter: number of splits:2
18/05/14 10:00:39 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
18/05/14 10:00:40 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1526119303078_0004
18/05/14 10:00:40 INFO impl.YarnClientImpl: Submitted application application_1526119303078_0004
18/05/14 10:00:40 INFO mapreduce.Job: The url to track the job: http://g13-1.novalocal:8088/proxy/application_1526119303078_0004/
18/05/14 10:00:40 INFO mapreduce.Job: Running job: job_1526119303078_0004
18/05/14 10:00:46 INFO mapreduce.Job: Job job_1526119303078_0004 running in uber mode : false
18/05/14 10:00:46 INFO mapreduce.Job: map 0% reduce 0%
18/05/14 10:00:50 INFO mapreduce.Job: map 100% reduce 0%
18/05/14 10:00:56 INFO mapreduce.Job: map 100% reduce 100%
18/05/14 10:00:57 INFO mapreduce.Job: Job job_1526119303078_0004 completed successfully
18/05/14 10:00:57 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=469610
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=290
HDFS: Number of bytes written=23
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Rack-local map tasks=2
Total time spent by all maps in occupied slots (ms)=5834
Total time spent by all reduces in occupied slots (ms)=3080
Total time spent by all map tasks (ms)=5834
Total time spent by all reduce tasks (ms)=3080
Total vcore-milliseconds taken by all map tasks=5834
Total vcore-milliseconds taken by all reduce tasks=3080
Total megabyte-milliseconds taken by all map tasks=5974016
Total megabyte-milliseconds taken by all reduce tasks=3153920
Map-Reduce Framework
Map input records=4
Map output records=16
Map output bytes=92
Map output materialized bytes=104
Input split bytes=200
Combine input records=0
Combine output records=0
Reduce input groups=4
Reduce shuffle bytes=104
Reduce input records=16
Reduce output records=4
Spilled Records=32
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=235
CPU time spent (ms)=3290
Physical memory (bytes) snapshot=1292472320
Virtual memory (bytes) snapshot=8453484544
Total committed heap usage (bytes)=1889533952
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=90
File Output Format Counters
Bytes Written=23
18/05/14 10:00:57 INFO streaming.StreamJob: Output directory: /user/root/wordcount/out
[root@vm wordcount]# 此时可以去Web UI上查看结果。命令行查看结果
[root@vm wordcount]# hadoop fs -cat /user/root/wordcount/out/part-00000
a 4
is 4
test 4
this 4
[root@vm wordcount]# 官方文档:https://hadoop.apache.org/docs/r1.0.4/cn/streaming.html
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。