欢迎投稿

今日深度:

在MongoDB上使用Spark

在MongoDB上使用Spark


【前言】Nosql技术只掌握了MongoDB。看到一篇文章介绍如何在MongoDB上使用Spark,赶紧翻译过来学习,提高一点核心竞争力。原文http://codeforhire.com/2014/02/18/using-spark-with-mongodb/

【正文】

在MongoDB上使用Spark

发布于 2014.02.18 作者 Sampo N

我最近开始研究Apache Spark作为数据挖掘框架。Spark建立在Apache Hadoop之上,它能够实施除Map-Reduce外更多的操作。同样它支持用迭代算法处理流数据。

既然Spark是基于Hadoop和HDFS,那么它就适于任何HDFS的数据源。我们的服务器使用了MongoDB,因而我们自然选择了mongo-hadoop 连接器,可以用它来实现从MongoDB上读写数据。

然而,这样做距离我们搞清楚如何配置、使用mongo-hadoop + spark还很远(至少对Spark入门者)。经过一番试验,以及令人沮丧的过程,以及向spark用户邮件列表发邮件咨询,我最终在Java和Scala环境上获得了成功。现在我写出这篇教程来解救大家。

仔细阅读以下内容,伸手党可以看这里:应用的样例代码

版本和APIs

Hadoop生态中充斥着各种不同的库,他们之间可能存在的APIs冲突会让人抓狂。主要的API变化在Hadoop 0.20。在这个版本中,老的org.apache.hadoop.mapred API变成了org.apache.hadoop.mapreduce API。API变化反过来影响了这些库:mongo-hadoop的包com.mongodb.hadoop.mapred变成com.mongodb.hadoop,同时SparkContext包含了方法hadoopRDD和newAPIHadoopRDD。

你需要小心选择出每个API的正确版本。这让事情更为复杂,因为在大多数情况下两个API的类名完全相同,只有包名不同。如果你碰到了谜一般的错误,再次检查一下使用的API是一致的。

样例使用了Hadoop 2.2.0 和新API。

库依赖

Apache Spark依赖于多个支撑库从Apache Commons和Hadoop 到 slf4j和Jetty。不要自己管理这些库依赖,使用Maven,Ivy,SBT或其他版本构建工具。

样例使用了SBT加载Akka Maven 仓库。这个Maven仓库包含了针对不同Hadoop版本的mongo-hadoop连接器,但是没有2.2.0的。因此单独添加了mongo-hadoop连接器。

在spark中使用mongo-hadoop

mongo-hadoop配置参数使用配置对象(从Hadoop包中获得)传递。最重要的参数是mongo.input.uri和mongo.output.uri,这个参数提供了MongoDB主机、端口、鉴权、db和collection名字。你也可以提供其他的配置选项,例如Mong查询语句用来限制输出数据。

每一个Mongo Collection分别作为独立的RDD载入,载入用的sparkcontext:

JavaPairRDD rdd = sc.newAPIHadoopRDD(config, MongoInputFormat.class, Object.class, BSONObject.class);

这里用了新的API,并且MongoInputFormat必须从com.mongodb.hadoop导入。对于旧的API,你应该使用hadoopRDD方法和com.mongodb.hadoop.mapred.MongoInputFormat。

返回类型是RDD,它的第一个参数是MongoDB文档中的的ObjectId实例,它的第二个参数包含了BSON文档。

保存RDD到MongoDB,使用了saveAsNewAPIHadoopFile方法:

rdd.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

 

只有最后两个参数看起来相关(虽然第一个参数必须是合法HDFS的URI)。RDD同样是RDD类型,然而,有个bug,第一个参数不能是ObjectId。如果你想指定ObjectId,就用一个String对象表示。如果你想让Mongo驱动自动生成ID,把第一个参数设成null(样例就是这么做的)。

样例app

样例应用 包含简单的单词计数算法,既有java的也有scala的。他们从MongoDB的beowulf.input collection中读出数据,在本地运行。(MongoDB的)文档只包含文字域,计数算法在文字域上工作。

结果存储在同样的beowulf库中,collection为output,文档包含word(文章中的单词)域和计数域。

样例要求MongoDB运行在本地,Scala版本2.10,SBT安装。然后你可以导入样例数据,运行程序,输出结果,使用如下的命令:

mongoimport -d beowulf -c input beowulf.json
sbt 'run-main JavaWordCount'
sbt 'run-main ScalaWordCount'
mongo beowulf --eval 'printjson(db.output.find().toArray())' | less

www.htsjk.Com true http://www.htsjk.com/DB2/20363.html NewsArticle 在MongoDB上使用Spark 【前言】Nosql技术只掌握了MongoDB。看到一篇文章介绍如何在MongoDB上使用Spark,赶紧翻译过来学习,提高一点核心竞争力。原文http://codeforhire.com/2014/02/18/using-spark-with...
相关文章
    暂无相关文章
评论暂时关闭