欢迎投稿

今日深度:

Hadoop 表连接,

Hadoop 表连接,


连接不同来源的数据 ---------------------------------- 在真实的情况中,会出现从不同的源中获取数据.如:要知道某些国家引用的专利是否来自另一个国家.这时候就又要查看引用数据:cite75_99.txt 又要查看国家信息 apat63_99.txt 在数据库中,用连接查询即可.但在 hadoop 中会较麻烦.可用的办法有:
一.Reduce 的连接 在数据库中,要从多个表中查询出自己需要的信息,需要连接查询。连接查询需要一个用来做关联的字段。 hadoop 进行解析时也有时需要从多个文件中查询。它也可以通过一个关联字段进行关联。原理一样。 例如,现在有如下两份数据:
用户数据 user: 用户ID,姓名,手机 1,张三,135xxxxxxxx 2,李四,136xxxxxxxx 3,王五,137xxxxxxxx 4,赵六,138xxxxxxxx
订单数据 order: 用户ID,订单ID,金额,日期 3,A,13,2013-02-12 1,B,23,2013-02-14 2,C,16,2013-02-17 3,D,25,2013-03-12
hadoop 有一个名为 datajoin的 contrib 软件包对应的 jar 包是: contrib/datajoin/hadoop-版本-datajoin.jar.它作为数据联结的通用框架. 通常,我们使用 hadoop 时,在 map 过程中进行数据分类,在 reduce 中进行数据处理。所以,一般联结处理也是在 map 中把相关的数据放到对应的 reduce 中;然后在各个 reduce 中进行联结处理。 需要处理的数据称为:数据源,它可以是一个或多个文件;两个或多个数据源之间进行关联的字段叫作:组键;由于 hadoop 处理每条记录时不知道记录是从哪个文件来的,这给处理带来了不便,象数据库联结查询时可以明确的获得某个表中的某个字段的值。为实际类似的功能,hadoop 增加了一个叫作 "标签"的概念,就相当于每条记录是属于哪个数据源。
拿上面数据为例,联结的过程如下:
----------------------------------------------------------------------- user order 1,张三,135xxxxxxxx 3,A,13,2013-02-12 2,李四,136xxxxxxxx 1,B,23,2013-02-14 3,王五,137xxxxxxxx 2,C,16,2013-02-17 4,赵六,138xxxxxxxx 3,D,25,2013-03-12
-------------------------------- map -----------------------------------
1-user-1,张三,135xxxxxxxx 3-order-3,A,13,2013-02-12 2-user-2,李四,136xxxxxxxx 1-order-1,B,23,2013-02-14 3-user-3,王五,137xxxxxxxx 2-order-2,C,16,2013-02-17 4-user-4,赵六,138xxxxxxxx 3-order-3,D,25,2013-03-12
------------------------------ shuffle ----------------------------------
1-user-1,张三,135xxxxxxxx 3-user-3,王五,137xxxxxxxx 1-order-1,B,23,2013-02-14 3-order-3,A,13,2013-02-12 3-order-3,D,25,2013-03-12

2-user-2,李四,136xxxxxxxx 4,赵六,138xxxxxxxx 2-order-2,C,16,2013-02-17
------------------------------ reduce 关联 ----------------------------------
可以看到,在 shuffle 的过程中,根据组键,将数据分发到不同的 reduce 中。然后每条记录会通过一个标签记着自己是 user 还是 order.
下面是 reduce 关联的细节, 以其中组键为 3 的数据为例:
-----------------------------------------------------------------------
3-user-3,王五,137xxxxxxxx 3-order-3,A,13,2013-02-12 3-order-3,D,25,2013-03-12
------------------------------ reduce  ----------------------------------
user-3,王五,137xxxxxxxx user-3,王五,137xxxxxxxx order-3,A,13,2013-02-12 order-3,D,25,2013-03-12
------------------------------ 合并  ----------------------------------
3,王五,137xxxxxxxx,A,13,2013-02-12 3,王五,137xxxxxxxx,D,25,2013-03-12
合并过程是将不同的标签的数据进行两两组合,任意两个不同标签的数据只能组合一次。相同标签的数据不组合。

================================================== 联结查询实例: 一.整理数据 user: 1,张三,135xxxxxxxx 2,李四,136xxxxxxxx 3,王五,137xxxxxxxx 4,赵六,138xxxxxxxx
order 3,A,13,2013-02-12 1,B,23,2013-02-14 2,C,16,2013-02-17 3,D,25,2013-03-12
将两个文件上传到 HDFS: [root@localhost bin]# ./hadoop fs -mkdir ./input/ [root@localhost bin]# ./hadoop fs -put user ./input/ [root@localhost bin]# ./hadoop fs -put order ./input/ [root@localhost bin]# ./hadoop fs -lsr . drwxr-xr-x   - root supergroup          0 2013-09-10 22:22 /user/root/input -rw-r--r--   1 root supergroup         72 2013-09-10 22:22 /user/root/input/order -rw-r--r--   1 root supergroup         84 2013-09-10 22:22 /user/root/input/user
二.JAVA代码示例:

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
public class DataJoin extends Configured implements Tool {
public static class MapClass extends DataJoinMapperBase {
@Override protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(","); String groupKey = tokens[0];
return new Text(groupKey); }
@Override protected Text generateInputTag(String inputFile) { String datasource = inputFile.split("-")[0];
return new Text(datasource); }
@Override protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; }
}
public static class Reduce extends DataJoinReducerBase {
@Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { if (tags.length < 2){ return null; } if(tags.length == 1 && ((Text)tags[0]).toString().endsWith("user")){ return null; }                         String retStr = "";            if(tags.length == 1 && ((Text)tags[0]).toString().endsWith("order")){                retStr = "NULL,";            } for(int i = 0; i < values.length; i++){                 if(i > 0){ retStr += ","; }                 TaggedWritable tw = (TaggedWritable) values[i]; // 结果是: 3,王五,137xxxxxxxx String line = ((Text) tw.getData()).toString(); // 结果是: {3,"王五,137xxxxxxxx"} String[] tokens = line.split(",", 2); retStr += tokens[1];             } TaggedWritable ret = new TaggedWritable(new Text(retStr));             ret.setTag((Text)tags[0]);                          return ret; } }
public static class TaggedWritable extends TaggedMapOutput { private Text data; public TaggedWritable(){   // 必须有,否则反序列化时出错 this.data = new Text("");        } public TaggedWritable(Text data){   this.data = data;        } @Override public void readFields(DataInput in) throws IOException { this.tag.readFields(in);             this.data.readFields(in); } @Override public void write(DataOutput out) throws IOException { this.tag.write(out);             this.data.write(out); } @Override public Writable getData() { return data; } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, DataJoin.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin");        job.setMapperClass(MapClass.class);        job.setReducerClass(Reduce.class);                job.setInputFormat(TextInputFormat.class);        job.setOutputFormat(TextOutputFormat.class);                job.setOutputKeyClass(Text.class);        job.setMapOutputKeyClass(Text.class);                job.setMapOutputValueClass(TaggedWritable.class);                job.setOutputValueClass(Text.class);                job.set("mapred.textoutputformat.separator", ",");          JobClient.runJob(job);  return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DataJoin(), args); System.exit(res); }
}
三.编译和运行 [root@localhost /]# cd /home/hadoop/sunyutest/ [root@localhost sunyutest]# mkdir datajoin/ [root@localhost sunyutest]# mkdir -p datajoin/src/ [root@localhost sunyutest]# mkdir -p datajoin/classes/ 将上面的 JAVA 代码放入 datajoin/src/ 下
将hadoop-datajoin-1.2.0.jar解压 tar xf hadoop-datajoin-1.2.0.jar 将 org 目录放到 datajoin/classes 下
编译: [root@localhost sunyutest] javac -classpath /home/hadoop/hadoop-core-1.2.0.jar:/home/hadoop/contrib/datajoin/hadoop-datajoin-1.2.0.jar -d datajoin/classes/ datajoin/src/DataJoin.java
打包 JAR: [root@localhost sunyutest]# jar -cf datajoin/datajoin.jar -C datajoin/classes/ . Hadoop 执行 JAR [root@localhost sunyutest]# /home/hadoop/bin/./hadoop jar datajoin/datajoin.jar DataJoin /user/root/input/ /user/root/joinoutput/
要注意, user.txt 和 order.txt 里面不要有空行.不然运行时会报错: java.lang.NullPointerException
运行结果: [root@localhost bin]# ./hadoop fs -cat ./test/part-00000 1,B,23,2013-02-14,张三,135xxxxxxxx 2,C,16,2013-02-17,李四,136xxxxxxxx 3,A,13,2013-02-12,王五,137xxxxxxxx 3,D,25,2013-03-12,王五,137xxxxxxxx

www.htsjk.Com true http://www.htsjk.com/Hadoop/40894.html NewsArticle Hadoop 表连接, 连接不同来源的数据----------------------------------在真实的情况中,会出现从不同的源中获取数据.如:要知道某些国家引用的专利是否来自另一个国家.这时候就又要查看引用数...
相关文章
    暂无相关文章
评论暂时关闭