Cassandra 入门三-使用Thrift +java和进行数据操作,cassandrathrift
Thrift是一个驱动车API,比较低级, cassandra官方文档一直在推荐高级点的去开发应用,但是我个人认为,还是先把Thrift原理搞明白,有利于进一步学习.
代码取自the definitive guide cassandra,有改动,因为书里面代码太老了,不适应1.2.1
使用Thrift需要的包就在安装cassandra里面的lib文件夹里面,放在java里
这里我假设已经了解 副本放置策略,副本因子等,
下面是我的一个连接类,用于建立Cassandra链接并返回一个client实例
package com.ovgu.cassandra.util;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
public class Connector {
TTransport tr = new TSocket(Constants.HOST, Constants.PORT);
private static final Logger log = Logger.getLogger(Connector.class);
private Cassandra.Client client;
/**
*
* @return connect to a client without specify a keyspace, can spacify a
* keyspace later with setKeyspace();
*
* @throws TTransportException
*/
public Cassandra.Client connect() throws TTransportException {
TFramedTransport tf = new TFramedTransport(tr);
TProtocol proto = new TBinaryProtocol(tf);
client = new Cassandra.Client(proto);
tr.open();
log.debug(Constants.starLine + "connect to the server port:+"
+ Constants.PORT + "+ host:" + Constants.HOST);
return client;
}
// return a connection to kespace
/**
*
* @param keyspace
* the default connected keyspace
* @return a Cassandra client connected with the server
* @throws TTransportException
* @throws TException
* @throws InvalidRequestException
*/
public Cassandra.Client connect(String keyspace)
throws TTransportException, TException, InvalidRequestException {
client = this.connect();
client.set_keyspace(keyspace);
return client;
}
public void setKeyspace(String keyspace) throws InvalidRequestException,
TException {
if (client != null) {
log.debug(Constants.starLine + "set a keyspace:" + keyspace
+ "to client :port:+" + Constants.PORT + "+ host:"
+ Constants.HOST);
client.set_keyspace(keyspace);
} else {
log.error("client initial error, client is null");
}
}
public void close() {
tr.close();
}
}
1, 写入一个数据
写入数据就是要写入 行键值 列名 列值 和 时间戳
Connector conn=new Connector();
Cassandra.Client client=conn.connect("Keyspace1");//建立client实例,用Keyspace1 等同于指定一个database
String cfName="Standard"; //实现建立好的一个column family 等同于指定表格
long timestamp=System.currentTimeMillis(); //定义一个时间戳,取系统时间 每一列操作都要有一个时间戳的
byte[] userIDKey="1".getBytes();
ColumnPath colPathName=new ColumnPath(cfName);
colPathName.setColumn("name".getBytes(UTF8));
ColumnParent cp=new ColumnParent(cfName);
//Insert the name column
LOG.info("Inserting row for key "+ new String(userIDKey));
Column nameCol=new Column();
nameCol.setName("name".getBytes(UTF8));
nameCol.setValue("George Clinton".getBytes());
nameCol.setTimestamp(timestamp);
client.insert(UsefulUtils.toByteBuffer(userIDKey),cp,nameCol,CL);
//insert age column
LOG.info("Inserting row for key "+ new String(userIDKey));
Column ageCol=new Column();
ageCol.setName("age".getBytes(UTF8));
ageCol.setValue("69".getBytes(UTF8));
ageCol.setTimestamp(timestamp);
client.insert(UsefulUtils.toByteBuffer(userIDKey),cp,ageCol,CL);
LOG.info("Row insert done");
//read the name only
LOG.info("reading name column:");
Column col=client.get(UsefulUtils.toByteBuffer(userIDKey), colPathName, CL).getColumn();
LOG.info("Column name :"+ UsefulUtils.toString(col.name));
LOG.info("Column value :"+ UsefulUtils.toString(col.value));
LOG.info("Column timestamp :"+ timestamp);
//create a slice predicate representing the columns to read start and finish
SlicePredicate predicate =new SlicePredicate();
SliceRange sliceRange=new SliceRange();
sliceRange.setStart("age".getBytes());
sliceRange.setFinish("name".getBytes());
predicate.setSlice_range(sliceRange);
LOG.info("Complete Row:");
//read all columns in the row
ColumnParent parent =new ColumnParent(cfName);
List<ColumnOrSuperColumn> results=client.get_slice(UsefulUtils.toByteBuffer(userIDKey), parent, predicate, CL);
//loop over columns,outputting values.
for (ColumnOrSuperColumn result : results){
Column column =result.column;
LOG.info(UsefulUtils.toString(column.name) +":" +UsefulUtils.toString(column.value));
}
conn.close();
LOG.info("all DONE");一个简单的工具包,由于书中老代码一直用byte[] 而cassandra新版本就把数据类型改了,用bytebuffer,只有写个小function来转换
package com.ovgu.cassandra.util;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.apache.log4j.Logger;
/**
*
* @author shuo wang
* @version 1.0
*
*/
public class UsefulUtils {
/**
*
* @param bytes
* turn byte array to ByteBuffer
* @return ByteBuffer
* @throws UnsupportedEncodingException
*/
private static Logger log = Logger.getLogger(UsefulUtils.class);
public static ByteBuffer toByteBuffer(byte[] bytes)
throws UnsupportedEncodingException {
log.debug(Constants.starLine + "toByteBuffer tranfers a byte[]"
+ bytes.toString() + " to ByteBuffer");
return ByteBuffer.wrap(bytes);
}// turn a string into Byttebuffer
/**
*
* @param value
* - turn value into ByteBuffer
* @return ByteBuffer
* @throws UnsupportedEncodingException
*/
public static ByteBuffer toByteBuffer(String value)
throws UnsupportedEncodingException {
log.debug(Constants.starLine + "toByteBuffer tranfers a String" + value
+ " to ByteBuffer");
return ByteBuffer.wrap(value.getBytes(Constants.UTF8));
}// turn a string into Bytebuffer
/**
*
* @param buffer
* , return toString use UTF8 encode
* @return
* @throws UnsupportedEncodingException
*/
public static String toString(ByteBuffer buffer)
throws UnsupportedEncodingException {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String result = new String(bytes, "UTF-8");
log.debug(Constants.starLine + "toByteBuffer tranfers a ByteBuffer:"
+ buffer.toString() + " to String:" + result);
return result;
}
public static void formatKeyValueOutPut(ByteBuffer key, ByteBuffer value) {
try {
String name = UsefulUtils.toString(key);
String val = UsefulUtils.toString(value);
System.out.println("Formatter: the name: " + name + ", the value:"
+ val);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
log.error("ERROR, NOT SUPPORT TYPE,PLEASE READ THE FOLLOWING INFORMATINO!");
log.error(e.toString());
}
}
}
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。