欢迎投稿

今日深度:

Spark 2.0与Cassandra结合 实现Rest API服务,sparkcassandra

Spark 2.0与Cassandra结合 实现Rest API服务,sparkcassandra


在这篇文章中,我将介绍如何在Spark中使用Akka-http并结合Cassandra实现REST服务,在这个系统中Cassandra用于数据的存储。  我们已经见识到Spark的威力,如果和Cassandra正确地结合可以实现更强大的系统。我们先创建一个build.sbt文件,内容如下:

name := "cassandra-spark-akka-http-starter-kit" version := "1.0" scalaVersion := "2.11.8" organization := "com.iteblog" val akkaV = "2.4.5"libraryDependencies ++= Seq(  "org.apache.spark" % "spark-core_2.11" % "2.0.0",  "org.apache.spark" % "spark-sql_2.11" % "2.0.0",  "com.typesafe.akka" %% "akka-http-core" % akkaV,  "com.typesafe.akka" %% "akka-http-experimental" % akkaV,  "com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test",  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,  "org.scalatest" %% "scalatest" % "2.2.6" % "test",  "com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.0.0-M3",  "net.liftweb" % "lift-json_2.11" % "2.6.2" ) assembleArtifact in assemblyPackageScala := false assemblyMergeStrategy in assembly := {  case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard  case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard  case "reference.conf" => MergeStrategy.concat  case _ => MergeStrategy.first} ivyScala := ivyScala.value map {  _.copy(overrideScalaVersion = true)}fork in run := true

上面我们把 assembleArtifact in assemblyPackageScala 设置为false,因为Spark已经包含了Scala library,所以我们不需要再包含了。

样本类User定义

User累仅仅包含id、名字以及Email等信息,定义如下:

package com.iteblog.domain case class User(id: String, name: String, email: String)

数据访问层

下面代码片段是数据访问层的实现:

package com.iteblog.factories import com.iteblog.domain.Userimport com.typesafe.config.ConfigFactoryimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport com.datastax.spark.connector._ import scala.util.Try  trait DatabaseAccess {   import Context._   def create(user: User): Boolean =    Try(sc.parallelize(Seq(user)).saveToCassandra(keyspace, tableName)).toOption.isDefined   def retrieve(id: String): Option[Array[User]] = Try(sc.cassandraTable[User](keyspace, tableName).where(s"id='$id'").collect()).toOption} object DatabaseAccess extends DatabaseAccess  object Context {  val config = ConfigFactory.load()  val url = config.getString("cassandra.url")  val sparkConf: SparkConf = new SparkConf().setAppName("Saprk-cassandra-akka-rest-example").setMaster("local[4]")    .set("spark.cassandra.connection.host", url)  val spark = SparkSession.builder().config(sparkConf).getOrCreate()  val sc = spark.sparkContext  val keyspace = config.getString("cassandra.keyspace")  val tableName = config.getString("cassandra.tableName")}

服务层

下面是路由文件的实现代码:

package com.iteblog.routes import java.util.UUID import akka.actor.ActorSystemimport akka.event.Loggingimport akka.http.scaladsl.model._import akka.http.scaladsl.server.Directives._import akka.http.scaladsl.server.{ExceptionHandler, Route}import akka.stream.ActorMaterializerimport com.iteblog.domain.Userimport com.iteblog.factories.DatabaseAccessimport net.liftweb.json._import java.util.Dateimport net.liftweb.json.Extraction._  trait SparkService extends DatabaseAccess {   implicit val system:ActorSystem  implicit val materializer:ActorMaterializer  val logger = Logging(system, getClass)    implicit def myExceptionHandler =    ExceptionHandler {      case e: ArithmeticException =>        extractUri { uri =>          complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong"))        }    }   implicit val formats: Formats = new DefaultFormats {    outer =>    override val typeHintFieldName = "type"    override    val typeHints = ShortTypeHints(List(classOf[String], classOf[Date]))  }   val sparkRoutes: Route = {    get {      path("create" / "name" / Segment / "email" / Segment) { (name: String, email: String) =>        complete {          val documentId = "user::" + UUID.randomUUID().toString          try {            val user = User(documentId,name,email)            val isPersisted = create(user)            if (isPersisted) {              HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId")            } else {              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")            }          } catch {            case ex: Throwable =>              logger.error(ex, ex.getMessage)              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")          }        }      }    } ~ path("retrieve" / "id" / Segment) { (listOfIds: String) =>      get {        complete {          try {            val idAsRDD: Option[Array[User]] = retrieve(listOfIds)            idAsRDD match {              case Some(data) => HttpResponse(StatusCodes.OK, entity = data.headOption.fold("")(x => compact(render(decompose(x)))))              case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong")            }          } catch {            case ex: Throwable =>              logger.error(ex, ex.getMessage)              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for ids : $listOfIds")          }        }      }    }  }}

服务启动

现在我们需要编写一个用于启动服务的类,其主要目的是启动一个HTTP服务,这样可以供用户调用,如下:

package com.iteblog import akka.actor.ActorSystemimport akka.http.scaladsl.Httpimport akka.stream.ActorMaterializerimport com.iteblog.routes.SparkServiceimport com.iteblog.factories.Context  class StartSparkServer(implicit val system: ActorSystem,                       implicit val materializer: ActorMaterializer) extends SparkService {  def startServer(address: String, port: Int) = {    Http().bindAndHandle(sparkRoutes, address, port)  }} object StartApplication extends App {  StartApp} object StartApp {  implicit val system: ActorSystem = ActorSystem("Spark-Couchbase-Service")  implicit val executor = system.dispatcher  implicit val materializer = ActorMaterializer()  val server = new StartSparkServer()  val config = Context.config  val serverUrl = config.getString("http.interface")  val port = config.getInt("http.port")  server.startServer(serverUrl, port)}

www.htsjk.Com true http://www.htsjk.com/cassandra/26134.html NewsArticle Spark 2.0与Cassandra结合 实现Rest API服务,sparkcassandra 在这篇文章中,我将介绍如何在 Spark 中使用Akka-http并结合 Cassandra 实现REST服务,在这个系统中 Cassandra 用于数据的存储。 我们已经见识...
相关文章
    暂无相关文章
评论暂时关闭