欢迎投稿

今日深度:

Hadoop二次排序,

Hadoop二次排序,


我想涉及到文件的Join操作应该都要使用到二次排序吧,之前我用字符串拼接的方法显得太不专业了,本来在reduce过程中是不需要保存这些数据的,遍历一次便可以将记录全部collect好。Hadoop 0.20包里面有一个SecondarySort的例子程序,结合公司牛人写的一个ppt,终于搞明白了。呵呵,刚好也用上了,所以总结一下。

Hadoop提供了几种默认类型如果Text,LongWritable等,它们都实现了WritableComparable接口,因此具有比较和序列化的能力。要实现二次排序,我想大概有两种办法。第一种是使用自定义类型,该类型实现WritableComparable接口,给原始数据添加一个权值,通过权值来改变排序的结果;第二种方法是给记录的key做一些不同的标记,比如有些在最前面加上一个’+'前缀,另一些前面加上’-'前缀,通过这些前缀来决定排序的规则。这两种办法都要实现自己的分区函数和分组函数,因为key已经被改变了,显然第一种方法感觉要专业一点,但是第二种方法感觉要高效一些,因为不需要类来封装。

我使用了第一种方法来实现二次排序,需求是做一个一对多的文件连接。来一个形象的比喻,比如一个人去商场买东西,他推着购物车,每个商品都有自己唯一的编号。因此数据有两部分组成:

1、用户对商品编号,这是一对多的。数据保存在base.dat文件中。

2、商品编号对商品的信息,其中包括商品的价格,这是一对一的。数据保存在spu.dat文件中。

最后要生成用户对应商品价格记录,这样就可以统计出用户购买商品的总价格。这两个文件通过商品的编号连接。

程序很简单,自己定义了一个UserKey类,在这个类封装了原始数据,另外添加一个权重属性,排序时需要将商品对商品价格排到最前面去。注意这个compareTo方法返回值的涵义,返回-1表示自己要排在比较的记录之前,返回1表示自己要排在比较的记录之前,之前我一直以为返回1表示大于的意思,结果程序就出现了奇怪的现象。Hadoop没有使用Java默认的序列化方式,用户必须负责自定义类型的序列化接口的实现,我感觉下面的程序写得不够专业,这是我比较惯用的序列化手段,如果使用SequenceFileOutputFormat保存输出结果,可以看到对象序列化后的数据的保存方式,不过Java虚拟机统一了数据格式,因此不能使用C/C++的思维来观察这些数据,但是也差不多了。

全部源代码如下:

?
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 package taobao;   import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.*;   import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.InverseMapper; import org.apache.hadoop.mapred.lib.MultipleOutputs; import org.apache.hadoop.util.*;   public class CK {       public static class Key implements WritableComparable<Key> {         public String id = "";         public short weight;           public Key() {         }           public Key(String i, short j) {             id = i;             weight = j;         }           @Override         public void readFields(DataInput in) throws IOException {             // 先写字符串的长度信息             int length = in.readInt();               byte[] buf = new byte[length];               in.readFully(buf, 0, length);               // 得到id号             id = new String(buf);               // 得到权值             weight = in.readShort();         }           @Override         public void write(DataOutput out) throws IOException {             System.out.println("in write");                           String str = id.toString();               int length = str.length();               byte[] buf = str.getBytes();               // 先写字符串长度             // WritableUtils.writeVInt(out, length);             out.writeInt(length);               // 再写字符串数据             out.write(buf, 0, length);               // 接着是权值             out.writeShort(weight);         }           @Override         public int hashCode() {             return id.hashCode();         }           @Override         public String toString() {             return id;         }           @Override         public boolean equals(Object right) {                           System.out.println("in equals");               // 只要id相等就认为两个key相等             if (right instanceof Key) {                 Key r = (Key) right;                 return r.id.equals(id);             } else {                 return false;             }         }           @Override         public int compareTo(Key k) {               System.out.println("in compareTo, key=" + k.toString());               // 先比较value id             int cmp = id.compareTo(k.id);               if (cmp != 0) {                 return cmp;             }               // 如果value id相等,再比较权值             if (weight > k.weight)                 return -1;             else if (weight < k.weight)                 return 1;             else                 return 0;         }     }       public static class Map extends MapReduceBase implements             Mapper<LongWritable, Text, Key, Text> {           public void map(LongWritable l, Text value,                 OutputCollector<Key, Text> output, Reporter reporter)                 throws IOException {             String line = value.toString();               String[] pair = line.split("\t");               if (pair.length == 3) {                 //key->商品编号,value->购买者                 Key k = new Key(pair[1], (short) 0);                   output.collect(k, new Text(pair[0]));             } else {                 //key->商品编号,value->商品价格                 Key k = new Key(pair[0], (short) 1);                   output.collect(k, new Text(pair[1]));             }         }     }       public static class Reduce extends MapReduceBase implements             Reducer<Key, Text, Text, Text> {         public void reduce(Key key, Iterator<Text> values,                 OutputCollector<Text, Text> output, Reporter reporter)                 throws IOException {               // 此处一定要new出一个新对象来,否则结果不会正确             // 这都是Java引用导致的问题             Text second =  new Text(values.next());               while (values.hasNext()) {                 output.collect(values.next(), second);             }         }     }       public static class FirstGroupingComparator implements RawComparator<Key> {         @Override         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {             return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8,                     b2, s2, Integer.SIZE / 8);         }           // 完全根据id来分区         @Override         public int compare(Key o1, Key o2) {             System.out.println("in group compare");             String l = o1.id.toString();             String r = o2.id.toString();             int res = l.compareTo(r);             System.out.println("res=" + res);             return res;         }     }       public static class FirstPartitioner implements Partitioner<Key, Text> {           @Override         public void configure(JobConf job) {             // TODO Auto-generated method stub         }           @Override         public int getPartition(Key key, Text value, int numPartitions) {             System.out.println("in FirstPartitioner");                           return Math.abs(key.id.hashCode()) % numPartitions;         }     }       public static void main(String[] args) throws Exception {           JobConf conf = new JobConf(CK.class);         conf.setJobName("Composite key");           // 设置Map输出的key和value的类型         conf.setMapOutputKeyClass(Key.class);         conf.setMapOutputValueClass(Text.class);           // 设置Reduce输出的key和value的类型         conf.setOutputKeyClass(Text.class);         conf.setOutputValueClass(Text.class);           // 设置Mapper和Reducer         conf.setMapperClass(Map.class);         conf.setReducerClass(Reduce.class);           // 设置group函数和分区函数         conf.setOutputValueGroupingComparator(FirstGroupingComparator.class);         conf.setPartitionerClass(FirstPartitioner.class);           conf.setInputFormat(TextInputFormat.class);         conf.setOutputFormat(TextOutputFormat.class);         // conf.setOutputFormat(SequenceFileOutputFormat.class);           // 如果输出目录已经存在,那么先将其删除         FileSystem fstm = FileSystem.get(conf);         Path outDir = new Path(args[1]);         fstm.delete(outDir, true);           // 设置输入输出目录         FileInputFormat.setInputPaths(conf, new Path(args[0]));         FileOutputFormat.setOutputPath(conf, outDir);           JobClient.runJob(conf);     } }

数据文件内容如下。

?
1 2 3 4 5 6 7 8 9 henshao@henshao-desktop:~$ hadoop fs -cat /ck/base.dat henshao 1000    10:20 henshao 1001    11:05 baoniu  1000    13:12 baoniu  1002    14:10 henshao@henshao-desktop:~$ hadoop fs -cat /ck/spu.dat 1000    179 1001    218 1002    255

Job运行过程及生成数据结果如下所示:

?
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 henshao@henshao-desktop:~$ hadoop jar CK_0.7.jar taobao.CK /ck /ck_out_0.7 10/09/14 17:02:09 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 10/09/14 17:02:10 INFO mapred.FileInputFormat: Total input paths to process : 2 10/09/14 17:02:10 INFO mapred.JobClient: Running job: job_201009141201_0028 10/09/14 17:02:11 INFO mapred.JobClient:  map 0% reduce 0% 10/09/14 17:02:18 INFO mapred.JobClient:  map 33% reduce 0% 10/09/14 17:02:22 INFO mapred.JobClient:  map 66% reduce 0% 10/09/14 17:02:23 INFO mapred.JobClient:  map 100% reduce 0% 10/09/14 17:02:29 INFO mapred.JobClient:  map 100% reduce 100% 10/09/14 17:02:30 INFO mapred.JobClient: Job complete: job_201009141201_0028 10/09/14 17:02:30 INFO mapred.JobClient: Counters: 16 10/09/14 17:02:30 INFO mapred.JobClient:   File Systems 10/09/14 17:02:30 INFO mapred.JobClient:     HDFS bytes read=126 10/09/14 17:02:30 INFO mapred.JobClient:     HDFS bytes written=46 10/09/14 17:02:30 INFO mapred.JobClient:     Local bytes read=132 10/09/14 17:02:30 INFO mapred.JobClient:     Local bytes written=360 10/09/14 17:02:30 INFO mapred.JobClient:   Job Counters 10/09/14 17:02:30 INFO mapred.JobClient:     Launched reduce tasks=1 10/09/14 17:02:30 INFO mapred.JobClient:     Launched map tasks=3 10/09/14 17:02:30 INFO mapred.JobClient:     Data-local map tasks=3 10/09/14 17:02:30 INFO mapred.JobClient:   Map-Reduce Framework 10/09/14 17:02:30 INFO mapred.JobClient:     Reduce input groups=3 10/09/14 17:02:30 INFO mapred.JobClient:     Combine output records=0 10/09/14 17:02:30 INFO mapred.JobClient:     Map input records=7 10/09/14 17:02:30 INFO mapred.JobClient:     Reduce output records=4 10/09/14 17:02:30 INFO mapred.JobClient:     Map output bytes=112 10/09/14 17:02:30 INFO mapred.JobClient:     Map input bytes=101 10/09/14 17:02:30 INFO mapred.JobClient:     Combine input records=0 10/09/14 17:02:30 INFO mapred.JobClient:     Map output records=7 10/09/14 17:02:30 INFO mapred.JobClient:     Reduce input records=7 henshao@henshao-desktop:~$ hadoop fs -cat /ck_out_0.7/part-00000 henshao 179 baoniu  179 henshao 218 baoniu  255

Hadoop二次排序的理解。mapper生成的数据已经经过排序了,这些数据经过shuffle送到reducer。在交给reduce函数之前,记录需要经过排序和group。排序可以通过setOutputKeyComparatorClass进行干预,上面的例子程序中没有编写这个排序的类,它会使用Key的compareTo来排序。group通过setOutputValueGroupingComparator进行设置,它们的例子程序将商品编号相同的记录group到一起。

可以想象经过排序的结果如下:

?
1 2 3 4 5 6 7 1000,1  179 1000,0  henshao 1000,0  baoniu 1001,1  218 1000,0  henshao 1002,1  255 1000,0  henshao

group的结果如下:

?
1 2 3 1000    179 henshao baoniu 1001    218 henshao 1002    255 henshao

www.htsjk.Com true http://www.htsjk.com/Hadoop/41536.html NewsArticle Hadoop二次排序, 我想涉及到文件的Join操作应该都要使用到二次排序吧,之前我用字符串拼接的方法显得太不专业了,本来在reduce过程中是不需要保存这些数据的,遍历一次便可以将记...
相关文章
    暂无相关文章
评论暂时关闭