欢迎投稿

今日深度:

Cassandra 入门三-使用Thrift +java和进行数据操作,cassandrathrift

Cassandra 入门三-使用Thrift +java和进行数据操作,cassandrathrift


Thrift是一个驱动车API,比较低级, cassandra官方文档一直在推荐高级点的去开发应用,但是我个人认为,还是先把Thrift原理搞明白,有利于进一步学习.

代码取自the definitive guide cassandra,有改动,因为书里面代码太老了,不适应1.2.1

使用Thrift需要的包就在安装cassandra里面的lib文件夹里面,放在java里


这里我假设已经了解 副本放置策略,副本因子等,

下面是我的一个连接类,用于建立Cassandra链接并返回一个client实例

package com.ovgu.cassandra.util;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

public class Connector {
	TTransport tr = new TSocket(Constants.HOST, Constants.PORT);
	private static final Logger log = Logger.getLogger(Connector.class);
	private Cassandra.Client client;

	/**
	 * 
	 * @return connect to a client without specify a keyspace, can spacify a
	 *         keyspace later with setKeyspace();
	 * 
	 * @throws TTransportException
	 */
	public Cassandra.Client connect() throws TTransportException {
		TFramedTransport tf = new TFramedTransport(tr);
		TProtocol proto = new TBinaryProtocol(tf);
		client = new Cassandra.Client(proto);
		tr.open();

		log.debug(Constants.starLine + "connect to the server port:+"
				+ Constants.PORT + "+ host:" + Constants.HOST);

		return client;
	}

	// return a connection to kespace
	/**
	 * 
	 * @param keyspace
	 *            the default connected keyspace
	 * @return a Cassandra client connected with the server
	 * @throws TTransportException
	 * @throws TException
	 * @throws InvalidRequestException
	 */
	public Cassandra.Client connect(String keyspace)
			throws TTransportException, TException, InvalidRequestException {

		client = this.connect();
		client.set_keyspace(keyspace);

		return client;
	}

	public void setKeyspace(String keyspace) throws InvalidRequestException,
			TException {
		if (client != null) {
			log.debug(Constants.starLine + "set a keyspace:" + keyspace
					+ "to client :port:+" + Constants.PORT + "+ host:"
					+ Constants.HOST);
			client.set_keyspace(keyspace);
		} else {
			log.error("client initial error, client is null");
		}
	}

	public void close() {
		tr.close();
	}

}


1, 写入一个数据

写入数据就是要写入 行键值 列名 列值 和 时间戳

Connector conn=new Connector();
		
		Cassandra.Client client=conn.connect("Keyspace1");//建立client实例,用Keyspace1  等同于指定一个database
		
		String cfName="Standard";                   //实现建立好的一个column family   等同于指定表格
		long timestamp=System.currentTimeMillis(); //定义一个时间戳,取系统时间         每一列操作都要有一个时间戳的
		byte[] userIDKey="1".getBytes();
		
		ColumnPath colPathName=new ColumnPath(cfName);          
		colPathName.setColumn("name".getBytes(UTF8));
		
		ColumnParent cp=new ColumnParent(cfName);
		
		//Insert the name column
		LOG.info("Inserting row for key "+ new String(userIDKey));
		Column nameCol=new Column();
		nameCol.setName("name".getBytes(UTF8));
		nameCol.setValue("George Clinton".getBytes());
		nameCol.setTimestamp(timestamp);
		client.insert(UsefulUtils.toByteBuffer(userIDKey),cp,nameCol,CL);
		
		//insert age column
		LOG.info("Inserting row for key "+ new String(userIDKey));
		Column ageCol=new Column();
		ageCol.setName("age".getBytes(UTF8));
		ageCol.setValue("69".getBytes(UTF8));
		ageCol.setTimestamp(timestamp);
		client.insert(UsefulUtils.toByteBuffer(userIDKey),cp,ageCol,CL);
		
		LOG.info("Row insert done");
		
		
		//read the name only
		LOG.info("reading name column:");
		Column col=client.get(UsefulUtils.toByteBuffer(userIDKey), colPathName, CL).getColumn();
		
		LOG.info("Column name :"+ UsefulUtils.toString(col.name));
		LOG.info("Column value :"+ UsefulUtils.toString(col.value));
		LOG.info("Column timestamp :"+ timestamp);
		
		//create a slice predicate representing the columns to read start and finish
		
		SlicePredicate predicate =new SlicePredicate(); 
		SliceRange sliceRange=new SliceRange();
		sliceRange.setStart("age".getBytes());
		sliceRange.setFinish("name".getBytes());
		predicate.setSlice_range(sliceRange);
		
		LOG.info("Complete Row:");
		
		//read all columns in the row
		ColumnParent parent =new ColumnParent(cfName);
		List<ColumnOrSuperColumn> results=client.get_slice(UsefulUtils.toByteBuffer(userIDKey), parent, predicate, CL);
		
		//loop over columns,outputting values.
		for (ColumnOrSuperColumn result : results){
			Column column =result.column;
			LOG.info(UsefulUtils.toString(column.name) +":" +UsefulUtils.toString(column.value));
		}
		conn.close();
		LOG.info("all DONE");



一个简单的工具包,由于书中老代码一直用byte[] 而cassandra新版本就把数据类型改了,用bytebuffer,只有写个小function来转换

package com.ovgu.cassandra.util;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;

import org.apache.log4j.Logger;

/**
 * 
 * @author shuo wang
 * @version 1.0
 * 
 */
public class UsefulUtils {
	/**
	 * 
	 * @param bytes
	 *            turn byte array to ByteBuffer
	 * @return ByteBuffer
	 * @throws UnsupportedEncodingException
	 */

	private static Logger log = Logger.getLogger(UsefulUtils.class);

	public static ByteBuffer toByteBuffer(byte[] bytes)
			throws UnsupportedEncodingException {
		log.debug(Constants.starLine + "toByteBuffer tranfers a byte[]"
				+ bytes.toString() + " to ByteBuffer");
		return ByteBuffer.wrap(bytes);
	}// turn a string into Byttebuffer

	/**
	 * 
	 * @param value
	 *            - turn value into ByteBuffer
	 * @return ByteBuffer
	 * @throws UnsupportedEncodingException
	 */

	public static ByteBuffer toByteBuffer(String value)
			throws UnsupportedEncodingException {
		log.debug(Constants.starLine + "toByteBuffer tranfers a String" + value
				+ " to ByteBuffer");
		return ByteBuffer.wrap(value.getBytes(Constants.UTF8));
	}// turn a string into Bytebuffer

	/**
	 * 
	 * @param buffer
	 *            , return toString use UTF8 encode
	 * @return
	 * @throws UnsupportedEncodingException
	 */
	public static String toString(ByteBuffer buffer)
			throws UnsupportedEncodingException {

		byte[] bytes = new byte[buffer.remaining()];
		buffer.get(bytes);
		String result = new String(bytes, "UTF-8");
		log.debug(Constants.starLine + "toByteBuffer tranfers a ByteBuffer:"
				+ buffer.toString() + " to String:" + result);
		return result;
	}

	public static void formatKeyValueOutPut(ByteBuffer key, ByteBuffer value) {

		try {
			String name = UsefulUtils.toString(key);
			String val = UsefulUtils.toString(value);

			System.out.println("Formatter: the name: " + name + ", the value:"
					+ val);
		} catch (UnsupportedEncodingException e) {
			// TODO Auto-generated catch block
			log.error("ERROR, NOT SUPPORT TYPE,PLEASE READ THE FOLLOWING INFORMATINO!");
			log.error(e.toString());
			
		}

	}
}



www.htsjk.Com true http://www.htsjk.com/cassandra/30309.html NewsArticle Cassandra 入门三-使用Thrift +java和进行数据操作,cassandrathrift Thrift是一个驱动车API,比较低级, cassandra官方文档一直在推荐高级点的去开发应用,但是我个人认为,还是先把Thrift原理搞明白,有...
相关文章
    暂无相关文章
评论暂时关闭