Hadoop二次排序,
我想涉及到文件的Join操作应该都要使用到二次排序吧,之前我用字符串拼接的方法显得太不专业了,本来在reduce过程中是不需要保存这些数据的,遍历一次便可以将记录全部collect好。Hadoop 0.20包里面有一个SecondarySort的例子程序,结合公司牛人写的一个ppt,终于搞明白了。呵呵,刚好也用上了,所以总结一下。
Hadoop提供了几种默认类型如果Text,LongWritable等,它们都实现了WritableComparable接口,因此具有比较和序列化的能力。要实现二次排序,我想大概有两种办法。第一种是使用自定义类型,该类型实现WritableComparable接口,给原始数据添加一个权值,通过权值来改变排序的结果;第二种方法是给记录的key做一些不同的标记,比如有些在最前面加上一个’+'前缀,另一些前面加上’-'前缀,通过这些前缀来决定排序的规则。这两种办法都要实现自己的分区函数和分组函数,因为key已经被改变了,显然第一种方法感觉要专业一点,但是第二种方法感觉要高效一些,因为不需要类来封装。
我使用了第一种方法来实现二次排序,需求是做一个一对多的文件连接。来一个形象的比喻,比如一个人去商场买东西,他推着购物车,每个商品都有自己唯一的编号。因此数据有两部分组成:
1、用户对商品编号,这是一对多的。数据保存在base.dat文件中。
2、商品编号对商品的信息,其中包括商品的价格,这是一对一的。数据保存在spu.dat文件中。
最后要生成用户对应商品价格记录,这样就可以统计出用户购买商品的总价格。这两个文件通过商品的编号连接。
程序很简单,自己定义了一个UserKey类,在这个类封装了原始数据,另外添加一个权重属性,排序时需要将商品对商品价格排到最前面去。注意这个compareTo方法返回值的涵义,返回-1表示自己要排在比较的记录之前,返回1表示自己要排在比较的记录之前,之前我一直以为返回1表示大于的意思,结果程序就出现了奇怪的现象。Hadoop没有使用Java默认的序列化方式,用户必须负责自定义类型的序列化接口的实现,我感觉下面的程序写得不够专业,这是我比较惯用的序列化手段,如果使用SequenceFileOutputFormat保存输出结果,可以看到对象序列化后的数据的保存方式,不过Java虚拟机统一了数据格式,因此不能使用C/C++的思维来观察这些数据,但是也差不多了。
全部源代码如下:
?001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
package
taobao;
import
java.io.DataInput;
import
java.io.DataOutput;
import
java.io.IOException;
import
java.util.*;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.conf.*;
import
org.apache.hadoop.io.*;
import
org.apache.hadoop.mapred.*;
import
org.apache.hadoop.mapred.lib.InverseMapper;
import
org.apache.hadoop.mapred.lib.MultipleOutputs;
import
org.apache.hadoop.util.*;
public
class
CK {
public
static
class
Key implements
WritableComparable<Key> {
public
String id = "" ;
public
short
weight;
public
Key() {
}
public
Key(String i, short
j) {
id
= i;
weight
= j;
}
@Override
public
void
readFields(DataInput in) throws
IOException {
//
先写字符串的长度信息
int
length = in.readInt();
byte []
buf = new
byte [length];
in.readFully(buf,
0 ,
length);
//
得到id号
id
= new
String(buf);
//
得到权值
weight
= in.readShort();
}
@Override
public
void
write(DataOutput out) throws
IOException {
System.out.println( "in
write" );
String
str = id.toString();
int
length = str.length();
byte []
buf = str.getBytes();
//
先写字符串长度
//
WritableUtils.writeVInt(out, length);
out.writeInt(length);
//
再写字符串数据
out.write(buf,
0 ,
length);
//
接着是权值
out.writeShort(weight);
}
@Override
public
int
hashCode() {
return
id.hashCode();
}
@Override
public
String toString() {
return
id;
}
@Override
public
boolean
equals(Object right) {
System.out.println( "in
equals" );
//
只要id相等就认为两个key相等
if
(right instanceof
Key) {
Key
r = (Key) right;
return
r.id.equals(id);
}
else
{
return
false ;
}
}
@Override
public
int
compareTo(Key k) {
System.out.println( "in
compareTo, key="
+ k.toString());
//
先比较value id
int
cmp = id.compareTo(k.id);
if
(cmp != 0 )
{
return
cmp;
}
//
如果value id相等,再比较权值
if
(weight > k.weight)
return
- 1 ;
else
if
(weight < k.weight)
return
1 ;
else
return
0 ;
}
}
public
static
class
Map extends
MapReduceBase implements
Mapper<LongWritable,
Text, Key, Text> {
public
void
map(LongWritable l, Text value,
OutputCollector<Key,
Text> output, Reporter reporter)
throws
IOException {
String
line = value.toString();
String[]
pair = line.split( "\t" );
if
(pair.length == 3 )
{
//key->商品编号,value->购买者
Key
k = new
Key(pair[ 1 ],
( short )
0 );
output.collect(k,
new
Text(pair[ 0 ]));
}
else
{
//key->商品编号,value->商品价格
Key
k = new
Key(pair[ 0 ],
( short )
1 );
output.collect(k,
new
Text(pair[ 1 ]));
}
}
}
public
static
class
Reduce extends
MapReduceBase implements
Reducer<Key,
Text, Text, Text> {
public
void
reduce(Key key, Iterator<Text> values,
OutputCollector<Text,
Text> output, Reporter reporter)
throws
IOException {
//
此处一定要new出一个新对象来,否则结果不会正确
//
这都是Java引用导致的问题
Text
second = new
Text(values.next());
while
(values.hasNext()) {
output.collect(values.next(),
second);
}
}
}
public
static
class
FirstGroupingComparator implements
RawComparator<Key> {
@Override
public
int
compare( byte []
b1, int
s1, int
l1, byte []
b2, int
s2, int
l2) {
return
WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8 ,
b2,
s2, Integer.SIZE / 8 );
}
//
完全根据id来分区
@Override
public
int
compare(Key o1, Key o2) {
System.out.println( "in
group compare" );
String
l = o1.id.toString();
String
r = o2.id.toString();
int
res = l.compareTo(r);
System.out.println( "res="
+ res);
return
res;
}
}
public
static
class
FirstPartitioner implements
Partitioner<Key, Text> {
@Override
public
void
configure(JobConf job) {
//
TODO Auto-generated method stub
}
@Override
public
int
getPartition(Key key, Text value, int
numPartitions) {
System.out.println( "in
FirstPartitioner" );
return
Math.abs(key.id.hashCode()) % numPartitions;
}
}
public
static
void
main(String[] args) throws
Exception {
JobConf
conf = new
JobConf(CK. class );
conf.setJobName( "Composite
key" );
//
设置Map输出的key和value的类型
conf.setMapOutputKeyClass(Key. class );
conf.setMapOutputValueClass(Text. class );
//
设置Reduce输出的key和value的类型
conf.setOutputKeyClass(Text. class );
conf.setOutputValueClass(Text. class );
//
设置Mapper和Reducer
conf.setMapperClass(Map. class );
conf.setReducerClass(Reduce. class );
//
设置group函数和分区函数
conf.setOutputValueGroupingComparator(FirstGroupingComparator. class );
conf.setPartitionerClass(FirstPartitioner. class );
conf.setInputFormat(TextInputFormat. class );
conf.setOutputFormat(TextOutputFormat. class );
//
conf.setOutputFormat(SequenceFileOutputFormat.class);
//
如果输出目录已经存在,那么先将其删除
FileSystem
fstm = FileSystem.get(conf);
Path
outDir = new
Path(args[ 1 ]);
fstm.delete(outDir,
true );
//
设置输入输出目录
FileInputFormat.setInputPaths(conf,
new
Path(args[ 0 ]));
FileOutputFormat.setOutputPath(conf,
outDir);
JobClient.runJob(conf);
}
}
|
数据文件内容如下。
?1 2 3 4 5 6 7 8 9 |
henshao@henshao-desktop:~$
hadoop fs -cat /ck/base.dat
henshao
1000 10:20
henshao
1001 11:05
baoniu
1000 13:12
baoniu
1002 14:10
henshao@henshao-desktop:~$
hadoop fs -cat /ck/spu.dat
1000
179
1001
218
1002
255
|
Job运行过程及生成数据结果如下所示:
?01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
henshao@henshao-desktop:~$
hadoop jar CK_0.7.jar taobao.CK /ck /ck_out_0.7
10/09/14
17:02:09 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
10/09/14
17:02:10 INFO mapred.FileInputFormat: Total input paths to process : 2
10/09/14
17:02:10 INFO mapred.JobClient: Running job: job_201009141201_0028
10/09/14
17:02:11 INFO mapred.JobClient: map 0% reduce 0%
10/09/14
17:02:18 INFO mapred.JobClient: map 33% reduce 0%
10/09/14
17:02:22 INFO mapred.JobClient: map 66% reduce 0%
10/09/14
17:02:23 INFO mapred.JobClient: map 100% reduce 0%
10/09/14
17:02:29 INFO mapred.JobClient: map 100% reduce 100%
10/09/14
17:02:30 INFO mapred.JobClient: Job complete: job_201009141201_0028
10/09/14
17:02:30 INFO mapred.JobClient: Counters: 16
10/09/14
17:02:30 INFO mapred.JobClient: File Systems
10/09/14
17:02:30 INFO mapred.JobClient: HDFS bytes read=126
10/09/14
17:02:30 INFO mapred.JobClient: HDFS bytes written=46
10/09/14
17:02:30 INFO mapred.JobClient: Local bytes read=132
10/09/14
17:02:30 INFO mapred.JobClient: Local bytes written=360
10/09/14
17:02:30 INFO mapred.JobClient: Job Counters
10/09/14
17:02:30 INFO mapred.JobClient: Launched reduce tasks=1
10/09/14
17:02:30 INFO mapred.JobClient: Launched map tasks=3
10/09/14
17:02:30 INFO mapred.JobClient: Data-local map tasks=3
10/09/14
17:02:30 INFO mapred.JobClient: Map-Reduce Framework
10/09/14
17:02:30 INFO mapred.JobClient: Reduce input groups=3
10/09/14
17:02:30 INFO mapred.JobClient: Combine output records=0
10/09/14
17:02:30 INFO mapred.JobClient: Map input records=7
10/09/14
17:02:30 INFO mapred.JobClient: Reduce output records=4
10/09/14
17:02:30 INFO mapred.JobClient: Map output bytes=112
10/09/14
17:02:30 INFO mapred.JobClient: Map input bytes=101
10/09/14
17:02:30 INFO mapred.JobClient: Combine input records=0
10/09/14
17:02:30 INFO mapred.JobClient: Map output records=7
10/09/14
17:02:30 INFO mapred.JobClient: Reduce input records=7
henshao@henshao-desktop:~$
hadoop fs -cat /ck_out_0.7/part-00000
henshao
179
baoniu
179
henshao
218
baoniu
255
|
Hadoop二次排序的理解。mapper生成的数据已经经过排序了,这些数据经过shuffle送到reducer。在交给reduce函数之前,记录需要经过排序和group。排序可以通过setOutputKeyComparatorClass进行干预,上面的例子程序中没有编写这个排序的类,它会使用Key的compareTo来排序。group通过setOutputValueGroupingComparator进行设置,它们的例子程序将商品编号相同的记录group到一起。
可以想象经过排序的结果如下:
?1 2 3 4 5 6 7 |
1000,1
179
1000,0
henshao
1000,0
baoniu
1001,1
218
1000,0
henshao
1002,1
255
1000,0
henshao
|
group的结果如下:
?1 2 3 |
1000
179 henshao baoniu
1001
218 henshao
1002
255 henshao
|