欢迎投稿

今日深度:

Spark,

Spark,


import os
import sys
import json
import pyspark

os.chdir("C:\\Temp")

if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME']="C:\\MyFolder\\spark-1.6.1-bin-hadoop2.6"

SPARK_HOME=os.environ['SPARK_HOME']

sys.path.insert(0,os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0,os.path.join(SPARK_HOME, "python"))

print SPARK_HOME

from pyspark import SparkContext
sc=SparkContext('local[4]','pyspark')

from pyspark.sql.types import *
from pyspark.sql import SQLContext, Row
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

lines=sc.textFile("C:\\Temp\\SC_Test.txt")
lines.count() # 69
line1=lines.map(lambda x: x.split("|"))
line1.count() 


line2=line1.map(lambda p: Row(trans_id=p[0],sndr_country=p[2],usd_amt=p[3],usd_margin=p[4] ))

trans=sqlContext.createDataFrame(line2)
trans.registerTempTable("trans")

sql1=sqlContext.sql("select trans_id, sndr_country from trans where sndr_country='CA'")
sql1.count()

sql2=sqlContext.sql("select trans_id, usd_amt from trans where usd_amt > 100 ")
sql2.count()
sql2.collect()

sql3=sqlContext.sql("select sndr_country, sum(usd_amt) from trans group by sndr_country ")
sql3.count()
sql3.collect()


"""
Load JSON files and Convert to dataframe
############# Impor & Convert JSON to dataframe
fp = open('C:\\Temp\\ColumnConfig_6009.json','r')
data = json.loads(fp.read())

data=sqlContext.createDataFrame(data)
data.registerTempTable("data")
fp.close()

sql1=sqlContext.sql("select * from data ")
sql1.count() # 2
"""
sqlContext.dropTempTable("trans")


www.htsjk.Com true http://www.htsjk.com/teradata/37235.html NewsArticle Spark, import osimport sysimport jsonimport pysparkos.chdir("C:\\Temp")if 'SPARK_HOME' not in os.environ: os.environ['SPARK_HOME']="C:\\MyFolder\\spark-1.6.1-bin-hadoop2.6"SPARK_HOME=os.environ['SPARK_HOME']sys.path.insert(0,os.path.join(...
相关文章
    暂无相关文章
评论暂时关闭