deploy, bulkLoader data, and query for titan 1.0 with cassandra cluster story,bulkloadertitan
搞了大约有半个月的titan, 只为了载入2亿点(4中类型 , 10个属性) , 10亿边(3中类型)的图数据,期间遇到了不少的坑,最终配置了cassandra集群,成功导入数据到titan1.0,并查询query
1 部署cassandra 集群
如果你的数据量不是很大,例如1000w点,2000w边,那么估计单击导入就可以,因此也不需要配置cassandra集群, 因为titan自己嵌入了cassandra, 直接 ./bin/titan.sh start 就可以启动了,默认的数据存储位置就在 db/data/ 下面
2 部署titan1.0
直接官网下载titan1.0版本,解压就能用 ,唯一需要的就是配置 jdk1.8版本 ,配置步骤如下:
$ sudo apt-get install software-properties-common
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer
之后 设置环境变量 export JAVA_HOME=/usr/lib/jvm/java-8-oracle
3 准备点和边的数据
我的数据是有databene-benerator生成的,格式如下
point_format: 逗号分割依次为 id,label,property1~property10
1,BankCard,Mf2Ei3Cm9El8Cz0Vt6Iq5Od1Uk3Hk4,Fu6Ph3Ij2Fq6Uh1,La5Rd7Kv4Ze7Xj8Gg2Cv0Cq,Nh1Ma4Di5Ig2Mc0Zn3Xs8Af0Do0Jp0,2054,594,Ir5Gw3Hf7Ty4Tc3Gi5Ki4Pn1Ze7Rn2,1,17675474023,225169216
edge_format:(adj形式)中间有个空格,前面为source,label, 后面为到达的dist,edge_label.
1,BankCard 13:recommend,14:terminal,37:recommend
意思是id为1的点(类型是BankCard,如果和点文件中点的类型不一致,则是两个点),到达id为13 14 37号点,有向边的类型分别得后面的英文.
4 点 和 边 数据解析文件(groovy文件)
点文件解析 文件名:point.groovy
def parse(line, factory) {def (id, vertextype, orderno, status, apptime, status2, hstyovertime, curtovertime, identify, vlabel, phonenum, bankcardnum) = line.split(/,/).toList()
def v1 = factory.vertex(id, vertextype)
if (orderno != null) { v1.property("orderNo", orderno) }
if (status != null) { v1.property("status", status) }
if (apptime != null) { v1.property("appTime", apptime) }
if (status2 != null) { v1.property("status2", status2) }
if (hstyovertime != null) { v1.property("historyOverdueTime", Integer.valueOf(hstyovertime)) }
if (curtovertime != null) { v1.property("currentOverdueTime", Integer.valueOf(curtovertime)) }
if (identify != null) { v1.property("Identification", identify) }
if (vlabel != null) { v1.property("vlabel", vlabel) }
if (phonenum != null) { v1.property("phoneNum", phonenum) }
if (bankcardnum != null) { v1.property("bankCardNum", bankcardnum) }
v1.property("bulkLoader.vertex.id",id)
return v1
}
边文件解析 文件名:edge.groovy
def parse(line, factory) {def parts = line.split(/ /)
def (src, vertextype) = parts[0].split(/,/).toList()def v1 = factory.vertex(src, vertextype)
if (parts.length == 2) {
parts[1].split(/,/).grep {!it.isEmpty() }.each {
def (dist, edgetype) = it.split(/:/).toList()
def v2 = factory.vertex(dist)
def edge = factory.edge(v1, v2, edgetype)
}
}
return v1
}
5 使用bulkLoader 批量导入文件: 文件名 bulkLoader.groovy
path = "/home/ubuntu/titan/data"
graph = TitanFactory.open("conf/titan-cassandra.properties")
mgmt = graph.openManagement()
// define the vertex label
if (!mgmt.containsVertexLabel("ApplyInfo")) {
ApplyInfo = mgmt.makeVertexLabel('ApplyInfo').make()
} else {
ApplyInfo = mgmt.getVertexLabel("ApplyInfo")
}
if (!mgmt.containsVertexLabel("Terminal")) {
Terminal = mgmt.makeVertexLabel('Terminal').make()
} else {
Terminal = mgmt.getVertexLabel("Terminal")
}
if (!mgmt.containsVertexLabel("BankCard")) {
BankCard = mgmt.makeVertexLabel('BankCard').make()
} else {
BankCard = mgmt.getVertexLabel("BankCard")
}
if (!mgmt.containsVertexLabel("Mobile")) {
Mobile = mgmt.makeVertexLabel('Mobile').make()
} else {
Mobile = mgmt.getVertexLabel("Mobile")
}
// define the edge label
if (!mgmt.containsEdgeLabel("terminal")) {
terminal = mgmt.makeEdgeLabel('terminal').multiplicity(MULTI).make()
} else {
terminal = mgmt.getEdgeLabel("terminal")
}
if (!mgmt.containsEdgeLabel("bankcard")) {
bankcard = mgmt.makeEdgeLabel('bankcard').multiplicity(MULTI).make()
} else {
bankcard = mgmt.getEdgeLabel("bankcard")
}
if (!mgmt.containsEdgeLabel("recommend")) {
recommend = mgmt.makeEdgeLabel('recommend').multiplicity(MULTI).make()
} else {
recommend = mgmt.getEdgeLabel("recommend")
}
// define the vertex property
// orderNo
if (!mgmt.containsPropertyKey("orderNo")) {
orderNo = mgmt.makePropertyKey('orderNo').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
orderNo = mgmt.getPropertyKey("orderNo")
}
// status
if (!mgmt.containsPropertyKey("status")) {
status = mgmt.makePropertyKey('status').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
status = mgmt.getPropertyKey("status")
}
// appTime
if (!mgmt.containsPropertyKey("appTime")) {
appTime = mgmt.makePropertyKey('appTime').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
appTime = mgmt.getPropertyKey("appTime")
}
// status2
if (!mgmt.containsPropertyKey("status2")) {
status2 = mgmt.makePropertyKey('status2').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
status2 = mgmt.getPropertyKey("status2")
}
// historyOverdueTime
if (!mgmt.containsPropertyKey("historyOverdueTime")) {
historyOverdueTime = mgmt.makePropertyKey('historyOverdueTime').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
historyOverdueTime = mgmt.getPropertyKey("historyOverdueTime")
}
// currentOverdueTime
if (!mgmt.containsPropertyKey("currentOverdueTime")) {
currentOverdueTime = mgmt.makePropertyKey('currentOverdueTime').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
currentOverdueTime = mgmt.getPropertyKey("currentOverdueTime")
}
// Identification
if (!mgmt.containsPropertyKey("Identification")) {
Identification = mgmt.makePropertyKey('Identification').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
Identification = mgmt.getPropertyKey("Identification")
}
// label
if (!mgmt.containsPropertyKey("vlabel")) {
vlabel = mgmt.makePropertyKey('vlabel').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
vlabel = mgmt.getPropertyKey("vlabel")
}
// phoneNum
if (!mgmt.containsPropertyKey("phoneNum")) {
phoneNum = mgmt.makePropertyKey('phoneNum').dataType(Long.class).cardinality(Cardinality.SINGLE).make()
} else {
phoneNum = mgmt.getPropertyKey("phoneNum")
}
// bankCardNum
if (!mgmt.containsPropertyKey("bankCardNum")) {
bankCardNum = mgmt.makePropertyKey('bankCardNum').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
bankCardNum = mgmt.getPropertyKey("bankCardNum")
}
// bulkLoader.vertex.id
if (!mgmt.containsPropertyKey("bulkLoader.vertex.id")) {
blid = mgmt.makePropertyKey("bulkLoader.vertex.id").dataType(Integer.class).make()
} else {
blid = mgmt.getPropertyKey("bulkLoader.vertex.id")
}
// build index
byPhoneComposite = mgmt.getGraphIndex("byPhoneComposite")
byOrderComposite = mgmt.getGraphIndex("byOrderComposite")
byBulkLoaderVertexId = mgmt.getGraphIndex("byBulkLoaderVertexId")
if (byPhoneComposite == null) {
byPhoneComposite = mgmt.buildIndex("byPhoneComposite", Vertex.class).addKey(phoneNum).buildCompositeIndex()
}
if (byOrderComposite == null) {
byOrderComposite = mgmt.buildIndex("byOrderComposite", Vertex.class).addKey(orderNo).buildCompositeIndex()
}
byBulkLoaderVertexId = mgmt.buildIndex("byBulkLoaderVertexId", Vertex.class).addKey(blid).buildCompositeIndex()
}
mgmt.commit()
graph.close()
graph = GraphFactory.open('conf/hadoop-graph/hadoop-script.properties')
hdfs.copyFromLocal("${path}/point.v", "point.v")
hdfs.copyFromLocal("${path}/point.groovy", "point.groovy")
graph.configuration.setInputLocation("point.v")
graph.configuration.setProperty("gremlin.hadoop.scriptInputFormat.script", "point.groovy")
blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
graph.compute(SparkGraphComputer).program(blvp).submit().get()
// load efile
hdfs.copyFromLocal("${path}/add100_e.txt", "edge.e")
hdfs.copyFromLocal("${path}/edge.groovy", "edge.groovy")
graph.configuration.setInputLocation("edge.e")
graph.configuration.setProperty("gremlin.hadoop.scriptInputFormat.script", "edge.groovy")
blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
graph.compute(SparkGraphComputer).program(blvp).submit().get()
6 执行 titan
./bin/gremlin.sh ./bulkLoader.groovy
7 如果数据量很大,就需要配置一些参数
schema.default = nonestorage.batch-loading = true
ids.block-size = 200000
ids.authority.wait = 1000
spark.master=local[4]