欢迎投稿

今日深度:

Cassandra学习笔记-基本特性与API操作,cassandra学习笔记

Cassandra学习笔记-基本特性与API操作,cassandra学习笔记


       Apache Cassandra是一套开源分布式Key-Value存储系统。它最初由Facebook开发,用于储存特别大的数据。

主要特性:分布式、基于column的结构化、高伸展性

       Cassandra的主要特点就是它不是一个数据库,而是由一堆数据库节点共同构成的一个分布式网络服务,对Cassandra 的一个写操作,会被复制到其他节点上去,对Cassandra的读操作,也会被路由到某个节点上面去读取。对于一个Cassandra群集来说,扩展性能 是比较简单的事情,只管在群集里面添加节点就可以了。

       Cassandra是一个混合型的非关系的数据库,类似于Google的BigTable。其主要功能比 Dynomite(分布式的Key-Value存 储系统)更丰富,但支持度却不如文档存储MongoDB(介于关系数据库和非关系数据库之间的开源产品,是非关系数据库当中功能最丰富,最像关系数据库 的。支持的数据结构非常松散,是类似json的bjson格式,因此可以存储比较复杂的数据类型。)Cassandra最初由Facebook开发,后转变成了开源项目。它是一个网络社交云计算方面理想的数据库。以Amazon专有的完全分布式的Dynamo为基础,结合了Google BigTable基于列族(Column Family)的数据模型。P2P去中心化的存储。很多方面都可以称之为Dynamo 2.0。

和其他数据库比较,有几个突出特点:

模式灵活 :使用Cassandra,像文档存储,你不必提前解决记录中的字段。你可以在系统运行时随意的添加或移除字段。这是一个惊人的效率提升,特别是在大型部署上。 
真正的可扩展性 :Cassandra是纯粹意义上的水平扩展。为给集群添加更多容量,可以指向另一台电脑。你不必重启任何进程,改变应用查询,或手动迁移任何数据。 
多数据中心识别 :你可以调整你的节点布局来避免某一个数据中心起火,一个备用的数据中心将至少有每条记录的完全复制。

Cassandra 中数据存储策略:

1. CommitLog:主要记录下客户端提交过来的数据以及操作。这个数据将被持久化到磁盘中,以便数据没有被持久化到磁盘时可以用来恢复。数据库中的commit log 分为 undo-log, redo-log 以及 undo-redo-log 三类,由于 cassandra采用时间戳识别新老数据而不会覆盖已有的数据,所以无须使用 undo 操作,因此它的 commit log 使用的是 redo log。 2. Memtable:用户写的数据在内存中的形式,Memtable中的数据是按照key排序好的,Memtable是一种内存结构,满足一定条件后批量刷新(flush)到磁盘上。其实还有另外一种形式是 BinaryMemtable 这个格式目前 Cassandra 并没有使用。 3. SSTable:数据被持久化到磁盘,这又分为 Data、Index 和 Filter 三种数据格式。SSTable一旦完成写入,就不可变更,只能读取。SSTable是不可修改的,且一般情况下一个CF可能会对应多个SSTable,这样,当用户检索数据时,如果每个SSTable均扫描一遍,将大大增加工作量。Cassandra为了减少没有必要的SSTable扫描,使用了BloomFilter,即通过多个hash函数将key映射到一个位图中,来快速判断这个key属于哪个SSTable。为了减少大量SSTable带来的开销,Cassandra会定期进行compaction,简单的说,compaction就是将同一个CF的多个SSTable合并成一个SSTable。

Cassandra 中数据分区策略:

将key/value按照key存放到不同的节点上。Partitioner会按照某种策略给每个cassandra节点分配一个token,每个key/value进行某种计算后,将被分配到器对应的节点上。

提供了以下几个分布策略:

org.apache.cassandra.dht.RandomPartitioner:

将key/value按照(key的)md5值均匀的存放到各个节点上。由于key是无序的,所有该策略无法支持针对Key的范围查询。

org.apache.cassandra.dht.ByteOrderedPartitioner(BOP):

将key/value按照key(byte)排序后存放到各个节点上。该Partitioner允许用户按照key的顺序扫描数据。该方法可能导致负载不均衡。

org.apache.cassandra.dht.OrderPreservingPartitioner:

该策略是一种过时的BOP,仅支持key为UTF8编码的字符串。

org.apache.cassandra.dht.CollatingOrderPreservingPartitioner:

该策略支持在EN或者US环境下的key排序方式。

Cassandra 中备份策略:

为了保证可靠性,一般要将数据写N份,其中一份写在其对应的节点上(由数据分片策略决定),另外N-1份如何存放,需要有相应的备份策略。

SimpleStrategy (以前称为RackUnawareStrategy,对应org.apache.cassandra.locator.RackUnawareStrategy):

不考虑数据中心,将Token按照从小到大的顺序,从第一个Token位置处依次取N个节点作为副本。

OldNetworkTopologyStrategy (以前称为RackAwareStrategy,对应org.apache.cassandra.locator.RackAwareStrategy):

考虑数据中心,首先,在primary Token所在的数据中心的不同rack上存储N-1份数据,然后在另外一个不同数据中心的节点存储一份数据。该策略特别适合多个数据中心的应用场景,这样可以牺牲部分性能(数据延迟)的前提下,提高系统可靠性。

NetworkTopologyStrategy (以前称为DatacenterShardStrategy,对应org.apache.cassandra.locator.DataCenterShardStrategy):

这需要复制策略属性文件,在该文件中定义在每个数据中心的副本数量。在各数据中心副本数量的总和应等于Keyspace的副本数量。

Cassandra 中网络拓扑策略:

主要用于计算不同host的相对距离,进而告诉Cassandra你的网络拓扑结构,以便更高效地对用户请求进行路由。

org.apache.cassandra.locator.SimpleSnitch:

将不同host逻辑上的距离(cassandra ring)作为他们之间的相对距离。

org.apache.cassandra.locator.RackInferringSnitch:

相对距离是由rack和data center决定的,分别对应ip的第3和第2个八位组。即,如果两个节点的ip的前3个八位组相同,则认为它们在同一个rack(同一个rack中不同节点,距离相同);如果两个节点的ip的前两个八位组相同,则认为它们在同一个数据中心(同一个data center中不同节点,距离相同)。

org.apache.cassandra.locator.PropertyFileSnitch:

相对距离是由rack和data center决定的,且它们是在配置文件cassandra-topology.properties中设置的。

Cassandra 中一致性策略:

Cassandra采用了最终一致性。最终一致性是指分布式系统中的一个数据对象的多个副本尽管在短时间内可能出现不一致,但是经过一段时间之后,这些副本最终会达到一致。

Cassandra 的一个特性是可以让用户指定每个读/插入/删除操作的一致性级别(consistency level)。Casssandra API 目前支持以下几种一致性级别:

ZERO:只对插入或者删除操作有意义。负责执行操作的节点把该修改发送给所有的备份节点,但是不会等待任何一个节点回复确认,因此不能保证任何的一致性。

ONE:对于插入或者删除操作,执行节点保证该修改已经写到一个存储节点的 commit log 和 Memtable 中;对于读操作,执行节点在获得一个存储节点上的数据之后立即返回结果。

QUORUM:假设该数据对象的备份节点数目为 n。对于插入或者删除操作,保证至少写到 n/2+1 个存储节点上;对于读操作,向 n/2+1 个存储节点查询,并返回时间戳最新的数据。

ALL:对于插入或者删除操作,执行节点保证n(n为replication factor)个节点插入或者删除成功后才向client返回成功确认消息,任何一个节点没有成功,该操作均失败;对于读操作,会向n个节点查询,返回时间戳最新的数据,同样,如果某个节点没有返回数据,则认为读失败。

Cassandra默认的读写模式W(QUORUM)/R(QUORUM),事实上,只要保证W+R>N(N为副本数),即写的节点和读的节点重叠,则是强一致性. 如果W+R<=N ,则是弱一致性.(其中,W是写节点数目,R是读节点数目)。

Cassandra 通过4个技术来维护数据的最终一致性,分别为逆熵(Anti-Entropy),读修复(Read Repair),提示移交(Hinted Handoff)和分布式删除。

(1) 逆熵

这是一种备份之间的同步机制。节点之间定期互相检查数据对象的一致性,这里采用的检查不一致的方法是 Merkle Tree;

(2) 读修复

客户端读取某个对象的时候,触发对该对象的一致性检查;

举例:读取Key A的数据时,系统会读取Key A的所有数据副本,如果发现有不一致,则进行一致性修复。

如果读一致性要求为ONE,会立即返回离客户端最近的一份数据副本。然后会在后台执行Read Repair。这意味着第一次读取到的数据可能不是最新的数据;如果读一致性要求为QUORUM,则会在读取超过半数的一致性的副本后返回一份副本给客户端,剩余节点的一致性检查和修复则在后台执行;如果读一致性要求高(ALL),则只有Read Repair完成后才能返回一致性的一份数据副本给客户端。可见,该机制有利于减少最终一致的时间窗口。

(3) 提示移交

对写操作,如果其中一个目标节点不在线,先将该对象中继到另一个节点上,中继节点等目标节点上线再把对象给它;

举例:Key A按照规则首要写入节点为N1,然后复制到N2。假如N1宕机,如果写入N2能满足ConsistencyLevel要求,则Key A对应的RowMutation将封装一个带hint信息的头部(包含了目标为N1的信息),然后随机写入一个节点N3,此副本不可读。同时正常复制一份数据到N2,此副本可以提供读。如果写N2不满足写一致性要求,则写会失败。 等到N1恢复后,原本应该写入N1的带hint头的信息将重新写回N1。

(4) 分布式删除

单机删除非常简单,只需要把数据直接从磁盘上去掉即可,而对于分布式,则不同,分布式删除的难点在于:如果某对象的一个备份节点 A 当前不在线,而其他备份节点删除了该对象,那么等 A 再次上线时,它并不知道该数据已被删除,所以会尝试恢复其他备份节点上的这个对象,这使得删除操作无效。Cassandra 的解决方案是:本地并不立即删除一个数据对象,而是给该对象标记一个hint,定期对标记了hint的对象进行垃圾回收。在垃圾回收之前,hint一直存在,这使得其他节点可以有机会由其他几个一致性保证机制得到这个hint。Cassandra 通过将删除操作转化为一个插入操作,巧妙地解决了这个问题。

Cassandra引入的是轻量级事务处理机制,或者说是通过CAS(Compare And Set)来处理或满足最终的一致性。比如在INSERT、UPDATE中使用IF语义来调用这种轻量级事务:

INSERT INTO customer_account (customerID, customer_email) 
VALUES (‘LauraS’, ‘lauras@gmail.com’)
IF NOT EXISTS;

UPDATE customer_account
SET    customer_email='laurass@gmail.com'
IF     customer_email='lauras@gmail.com'; 

Cassandra的环境搭建可以参考如下连接:
http://wiki.apache.org/cassandra/GettingStarted
https://www.digitalocean.com/community/tutorials/how-to-configure-a-multi-node-cluster-with-cassandra-on-a-ubuntu-vps

Cassandra的API连接,用的是2.1的版本,推荐用的是CQL,比较方便,所以就不重点说Thrift连接了。

基本连接可以通过如下方式:

Cluster cluster = Cluster.builder().addContactPoint("220.181.29.16").build();
Session session = cluster.connect("tkeyspace");
session 执行相关操作
TTransport transport = new TFramedTransport(new TSocket("220.181.29.16", 9160));  
TProtocol protocol = new TBinaryProtocol(transport); 
Cassandra.Client client = new Cassandra.Client(protocol);  
transport.open();
client 执行相关操作

Spring整合集成Cassandra操作如下:

首先在pom.xml文件添加相关依赖jar包

<dependency>
	<groupId>org.springframework.data</groupId>
	<artifactId>spring-data-cassandra</artifactId>
	<version>1.0.0.RELEASE</version>
	<exclusions>
        <span >	</span><exclusion>
                	<groupId>com.google.collections</groupId>
                	<artifactId>google-collections</artifactId>
            	</exclusion>
            	<exclusion>
            		<groupId>com.datastax.cassandra</groupId>
					<artifactId>cassandra-driver-core</artifactId>
            	</exclusion>
            	<exclusion>
            		<groupId>com.datastax.cassandra</groupId>
					<artifactId>cassandra-driver-dse</artifactId>
            	</exclusion>
       	</exclusions>
</dependency>
		
<dependency>
	<groupId>com.datastax.cassandra</groupId>
	<artifactId>cassandra-driver-core</artifactId>
	<version>2.1.0</version>
</dependency>
		
<dependency>
	<groupId>com.datastax.cassandra</groupId>
	<artifactId>cassandra-driver-dse</artifactId>
	<version>2.1.0</version>
</dependency>
		
<dependency>
	<groupId>com.google.guava</groupId>
	<artifactId>guava</artifactId>
	<version>14.0.1</version>
</dependency>
applicationContext-cassandra.xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:cassandra="http://www.springframework.org/schema/data/cassandra"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
		http://www.springframework.org/schema/data/cassandra http://www.springframework.org/schema/data/cassandra/spring-cassandra-1.0.xsd">

	<!-- 读取项目配置信息 -->
	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" p:location="classpath:cassandra.properties"/>
	
	<cassandra:cluster contact-points="${cassandra.contactpoints}" port="${cassandra.port}"></cassandra:cluster>
    
    <cassandra:session keyspace-name="${cassandra.keyspace}"></cassandra:session>
    <!--
    <cassandra:repositories base-package="cassandra.repo"></cassandra:repositories>
    -->
    <cassandra:mapping entity-base-packages="cassandra.entity"></cassandra:mapping>
    
    <cassandra:converter />
    
    <cassandra:template id="cassandraTemplate"/>
    
    
</beans>

相关代码如下:

import java.io.Serializable;
import java.util.UUID;

import org.springframework.data.cassandra.mapping.Column;
import org.springframework.data.cassandra.mapping.PrimaryKey;
import org.springframework.data.cassandra.mapping.Table;

@Table(value = "t_user")
public class User implements Serializable {
	
	private static final long serialVersionUID = 1L;
	
	@PrimaryKey(value = "user_id")
	private UUID userId;
	@Column(value = "fname")
	private String fname;
	@Column(value = "lname")
	private String lname;
	@Column(value = "age")
	private int age;
	
	public User() {
	}
	
	public User(String fname, String lname, int age) {
		this.fname = fname;
		this.lname = lname;
		this.age = age;
	}

	public UUID getUserId() {
		return userId;
	}

	public void setUserId(UUID userId) { 
		this.userId = userId;
	}

	public String getFname() {
		return fname;
	}

	public void setFname(String fname) {
		this.fname = fname;
	}

	public String getLname() {
		return lname;
	}

	public void setLname(String lname) {
		this.lname = lname;
	}

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}
	
}

import java.util.List;
import java.util.Map;

import com.netease.talk.dao.cassandra.query.Query;
import com.netease.talk.dao.cassandra.query.QueryResult;
import com.netease.talk.dao.cassandra.thingdb.Thing;
import com.netease.talk.dao.cassandra.thingdb.ThingData;

public interface IBasicDao<Entity, ID> {

	public void insert(Entity entity);
	
	public void insert(Thing thing);
	
	public void insert(ThingData thingData);
	
	public void insertThing(Entity entity);
	
	public void insertThingData(Entity entity);
	
	public void update(Entity entity);
	
	public void update(Thing thing);
	
	public void update(ThingData thingData);
	
	public void updateThing(Entity entity);
	
	public void updateThingData(Entity entity);
	
	public void deleteById(ID id);
	
	public void deleteThingById(ID id);
	
	public void deleteThingDataById(ID id);
	
	public void deleteThingDataByIdAndKey(ID id, String key);

	public Entity getOneById(ID id);
	
	public Entity getOneByParams(Map<String, Object> params);
	
	public List<Entity> getAll();
	
	public List<Entity> getAllByParams(Map<String, Object> params);
	
	public QueryResult<Entity> getAll(Query query);
	
	public int getCount(Query query);
	
}

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.annotation.Resource;

import org.apache.log4j.Logger;
import org.springframework.cassandra.core.QueryOptions;
import org.springframework.cassandra.core.RetryPolicy;
import org.springframework.data.cassandra.core.CassandraTemplate;
import org.springframework.stereotype.Repository;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Delete;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Update;
import com.datastax.driver.core.querybuilder.Update.Assignments;
import com.netease.talk.dao.cassandra.IBasicDao;
import com.netease.talk.dao.cassandra.query.CQLBuilder;
import com.netease.talk.dao.cassandra.query.Query;
import com.netease.talk.dao.cassandra.query.QueryResult;
import com.netease.talk.dao.cassandra.thingdb.Thing;
import com.netease.talk.dao.cassandra.thingdb.ThingData;
import com.netease.talk.dao.cassandra.thingdb.ThingUtils;

@Repository("basicDao")
public class BasicDaoImpl<Entity, ID> implements IBasicDao<Entity, ID> {
	
	private static Logger LOG = Logger.getLogger(BasicDaoImpl.class);
	
	private Class<Entity> entityClass = null;
	
	@Resource(name = "cassandraTemplate")
	private CassandraTemplate cassandraTemplate = null;
	
	@SuppressWarnings("unchecked")
	public BasicDaoImpl() {
		Type type = getClass().getGenericSuperclass();
        if (type instanceof ParameterizedType) {
            entityClass = (Class<Entity>) ((ParameterizedType) type).getActualTypeArguments()[0];
        }
	}
	
	public CassandraTemplate getCassandraTemplate() {
		return cassandraTemplate;
	}
	
	public Session getSession () {
		return cassandraTemplate.getSession();
	}
	
	public String thingTable() {
		return ThingUtils.thingTable(entityClass);
	}
	
	public String dataTable() {
		return ThingUtils.dataTable(entityClass);
	}
	
	public Entity newEntityInstance() {
		Entity entity = null;
		try {
			entity = entityClass.newInstance();
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
		return entity;
	}

	@Override
	public void insert(Entity entity) {
		insertThing(entity);
		insertThingData(entity);
	}
	
	@Override
	public void insert(Thing thing) {
		Insert insert = QueryBuilder.insertInto(thingTable());
		insert.setConsistencyLevel(ConsistencyLevel.QUORUM);
		Map<String, Object> map = ThingUtils.convertObjectToMap(thing, true);
		for (Map.Entry<String, Object> entry : map.entrySet()) {
			insert.value(entry.getKey(), entry.getValue());
		}
		cassandraTemplate.execute(insert);
	}

	@Override
	public void insert(ThingData thingData) {
		Insert insert = QueryBuilder.insertInto(dataTable());
		insert.setConsistencyLevel(ConsistencyLevel.QUORUM);
		Map<String, Object> map = ThingUtils.convertObjectToMap(thingData, true);
		for (Map.Entry<String, Object> entry : map.entrySet()) {
			insert.value(entry.getKey(), entry.getValue());
		}
		cassandraTemplate.execute(insert);
	}
	
	@Override
	public void insertThing(Entity entity) {
		insert(ThingUtils.extractThing(entity));
	}
	
	@Override
	public void insertThingData(Entity entity) {
		List<ThingData> thingDatas = ThingUtils.extractThingDatas(entity);
		for (ThingData thingData : thingDatas) {
			insert(thingData);
		}
	}
	
	@Override
	public void update(Entity entity) {
		updateThing(entity);
		updateThingData(entity);
	}
	
	@Override
	public void update(Thing thing) {
		Update update = QueryBuilder.update(thingTable());
		update.setConsistencyLevel(ConsistencyLevel.QUORUM);
		Map<String, Object> map = ThingUtils.convertObjectToMap(thing, true);
		String id = (String) map.get("id");
		map.remove("id");
		Assignments assignments = update.with();
		for (Map.Entry<String, Object> entry : map.entrySet()) {
			assignments.and(QueryBuilder.set(entry.getKey(), entry.getValue()));
		}
		update.where(QueryBuilder.eq("id", id));
		cassandraTemplate.execute(update);
	}
	
	@Override
	public void update(ThingData thingData) {
		Update update = QueryBuilder.update(dataTable());
		update.setConsistencyLevel(ConsistencyLevel.QUORUM);
		Assignments assignments = update.with();
		assignments.and(QueryBuilder.set("value", thingData.getValue()));
		assignments.and(QueryBuilder.set("kind", thingData.getKind()));
		com.datastax.driver.core.querybuilder.Update.Where where = update.where();
		where.and(QueryBuilder.eq("thing_id", thingData.getThingId()));
		where.and(QueryBuilder.eq("key", thingData.getKey()));
		cassandraTemplate.execute(update);
	}
	
	@Override
	public void updateThing(Entity entity) {
		update(ThingUtils.extractThing(entity));
	}
	
	@Override
	public void updateThingData(Entity entity) {
		List<ThingData> thingDatas = ThingUtils.extractThingDatas(entity);
		for (ThingData thingData : thingDatas) {
			update(thingData);
		}
	}
	
	@Override
	public void deleteById(ID id) {
		deleteThingById(id);
	}
	
	@Override
	public void deleteThingById(ID id) {
		deleteThingDataById(id);
		Delete delete = QueryBuilder.delete().from(thingTable());
		delete.setConsistencyLevel(ConsistencyLevel.ALL);
		delete.where(QueryBuilder.eq("id", id));
		cassandraTemplate.execute(delete);
	}
	
	@Override
	public void deleteThingDataById(ID id) {
		Delete delete = QueryBuilder.delete().from(dataTable());
		delete.setConsistencyLevel(ConsistencyLevel.ALL);
		delete.where(QueryBuilder.eq("thing_id", id));
		cassandraTemplate.execute(delete);
	}
	
	@Override
	public void deleteThingDataByIdAndKey(ID id, String key) {
		Delete delete = QueryBuilder.delete().from(dataTable());
		delete.setConsistencyLevel(ConsistencyLevel.ALL);
		delete.where().and(QueryBuilder.eq("thing_id", id))
			.and(QueryBuilder.eq("key", key));
		cassandraTemplate.execute(delete);
	}
	
	@Override
	public Entity getOneById(ID id) {
		String tCQL = "select * from " + thingTable() + " where id = ?";
		String dCQL = "select * from " + dataTable() + " where thing_id = ?";
		List<Entity> entities = getAll(tCQL, new Object[]{id} , dCQL, new Object[]{id});
		return entities.size() == 1 ? entities.get(0) : null;
	}
	
	@Override
	public Entity getOneByParams(Map<String, Object> params) {
		List<Entity> entities = getAllByParams(params);
		return entities.size() == 1 ? entities.get(0) : null;
	}
	
	@Override
	public List<Entity> getAll() {
		String tCQL = "select * from " + thingTable();
		String dCQL = "select * from " + dataTable() + " where thing_id = ?";
		return getAll(tCQL, null, dCQL, null);
	}
	
	@Override
	public List<Entity> getAllByParams(Map<String, Object> params) {
		Map<String, Object> tAttris = new HashMap<String, Object>();
		Map<String, Object> dAttris = new HashMap<String, Object>();
		for (Map.Entry<String, Object> entry : params.entrySet()) {
			String attribute = entry.getKey();
			Object value = entry.getValue();
			if (ThingUtils.isBasicAttribute(attribute)) {
				tAttris.put(attribute, value);
			} else {
				dAttris.put(attribute, value);
			}
		}
		int tAttrisLen = tAttris.size();
		int dAttrisLen = dAttris.size();
		if (dAttrisLen > 1) {
			LOG.error("暂只支持一个数据属性查询");
		}
		if (tAttrisLen > 0 && dAttrisLen == 0) {
			return getAllByThingParams(tAttris);
		} else if (tAttrisLen == 0 && dAttrisLen > 0) {
			return getAllByDataParams(dAttris);
		} else if (tAttrisLen > 0 && dAttrisLen > 0) {
			return getAllByThingAndDataParams(tAttris, dAttris);
		}
		return getAll();
	}
	
	@Override
	public QueryResult<Entity> getAll(Query query) {
		int tAttrisLen = query.getBasicAttributesLength();
		int dAttrisLen = query.getDataAttributesLength();
		QueryResult<Entity> results = new QueryResult<Entity>();
		if (tAttrisLen > 0 && dAttrisLen == 0) {
			results = getAllByThingParams(query);
		} else if (tAttrisLen == 0 && dAttrisLen > 0) {
			results =  getAllByDataParams(query);
		} else if (tAttrisLen > 0 && dAttrisLen > 0) {
			results = getAllByThingAndDataParams(query);
		} else {
			List<Entity> entities = getAll();
			results.setResultList(entities);
			results.setTotalRowNum(entities.size());
		}
		return results;
	}
	
	@Override
	public int getCount(Query query) {
		Long count = null;
		String tCountCQL = CQLBuilder.buildThingCountCQL(entityClass, query);
		QueryOptions options = new QueryOptions();
		options.setConsistencyLevel(org.springframework.cassandra.core.ConsistencyLevel.ONE);
		options.setRetryPolicy(RetryPolicy.DEFAULT);
		ResultSet resultSet = cassandraTemplate.query(tCountCQL, options);
		Iterator<Row> iterator = resultSet.iterator();
		while (iterator.hasNext()) {
			Row row = iterator.next();
			count = row.getLong(0);
		}
		return null == count ? 0 : count.intValue();
	}
	
	private List<Entity> getAll(String tCQL, Object[] tParams, String dCQL, Object[] dParams) {
		List<Entity> entities = new ArrayList<Entity>();
		ResultSet tResultSet = null;
		if (null == tParams || tParams.length == 0) {
			tResultSet = getSession().execute(tCQL);
		} else {
			tResultSet = getSession().execute(tCQL, tParams);
		}
		Iterator<Row> iterator = tResultSet.iterator();
		while (iterator.hasNext()) {
			Row row = iterator.next();
			Entity entity = newEntityInstance();
			rowToEntityCommonProps(row, entity);
			Object[] params = null;
			if (null == dParams || dParams.length == 0) {
				params = new Object[]{row.getLong("id")};
			} else {
				params = dParams;
			}
			ResultSet dResultSet = getSession().execute(dCQL, params);
			resultSetToEntity(dResultSet, entity);
			entities.add(entity);
		}
		return entities;
	}
	
	private List<Entity> getAllByThingParams(Map<String, Object> params) {
		List<Entity> entities = new ArrayList<Entity>();
		Object[] tCQL = CQLBuilder.buildThingCQLAndParams(entityClass, params);
		ResultSet resultSet = getSession().execute(String.valueOf(tCQL[0]), (Object[]) tCQL[1]);
		String dCQL = "select * from " + dataTable() + " where thing_id = ?";
		Iterator<Row> iterator = resultSet.iterator();
		while (iterator.hasNext()) {
			Row row = iterator.next();
			Entity entity = newEntityInstance();
			rowToEntityCommonProps(row, entity);
			ResultSet dResultSet = getSession().execute(dCQL, new Object[]{row.getLong("id")});
			resultSetToEntity(dResultSet, entity);
			entities.add(entity);
		}
		return entities;
	}
	
	private QueryResult<Entity> getAllByThingParams(Query query) {
		List<Entity> entities = new ArrayList<Entity>();
		String tCQL = CQLBuilder.buildThingCQL(entityClass, query);
		QueryOptions options = new QueryOptions();
		options.setConsistencyLevel(org.springframework.cassandra.core.ConsistencyLevel.ONE);
		options.setRetryPolicy(RetryPolicy.DEFAULT);
		ResultSet resultSet = cassandraTemplate.query(tCQL, options);
		StringBuilder dCQLPrefix = new StringBuilder();
		dCQLPrefix.append("select * from ").append(dataTable()).append(" where thing_id = ");
		Iterator<Row> iterator = resultSet.iterator();
		while (iterator.hasNext()) {
			Row row = iterator.next();
			Entity entity = newEntityInstance();
			rowToEntityCommonProps(row, entity);
			StringBuilder dCQL = new StringBuilder(dCQLPrefix);
			dCQL.append(row.getLong("id"));
			ResultSet dResultSet = cassandraTemplate.query(dCQL.toString(), options);
			resultSetToEntity(dResultSet, entity);
			entities.add(entity);
		}
		return new QueryResult<Entity>(getCount(query), entities);
	}
	
	private List<Entity> getAllByThingAndDataParams(Map<String, Object> tParams, Map<String, Object> dParams) {
		List<Entity> entities = new ArrayList<Entity>();
		Object[] tCQL = CQLBuilder.buildThingCQLAndParams(entityClass, tParams);
		ResultSet resultSet = getSession().execute(String.valueOf(tCQL[0]), (Object[]) tCQL[1]);
		Iterator<Row> iterator = resultSet.iterator();
		while (iterator.hasNext()) {
			Row row = iterator.next();
			dParams.put("thingId", row.getLong("id"));
			Object[] dCQL = CQLBuilder.buildDataCQLAndParams(entityClass, dParams);
			ResultSet dResultSet = getSession().execute(String.valueOf(dCQL[0]), (Object[]) dCQL[1]);
			if (dResultSet.all().size() == 0) continue;
			Entity entity = newEntityInstance();
			rowToEntityCommonProps(row, entity);
			String cql = "select * from " + dataTable() + " where thing_id = ?";
			ResultSet fResultSet = getSession().execute(cql, new Object[]{row.getLong("id")});
			resultSetToEntity(fResultSet, entity);
			entities.add(entity);
		}
		return entities;
	}
	
	@SuppressWarnings("unused")
	private QueryResult<Entity> getAllByThingAndOneDataParams(Query query) {
		List<Entity> entities = new ArrayList<Entity>();
		String tCQL = CQLBuilder.buildThingCQL(entityClass, query);
		QueryOptions options = new QueryOptions();
		options.setConsistencyLevel(org.springframework.cassandra.core.ConsistencyLevel.ONE);
		options.setRetryPolicy(RetryPolicy.DEFAULT);
		ResultSet resultSet = cassandraTemplate.query(tCQL, options);
		Iterator<Row> iterator = resultSet.iterator();
		while (iterator.hasNext()) {
			Row row = iterator.next();
			query.addCondition("thingId", row.getLong("id"));
			String dCQL = CQLBuilder.buildOneConditionDataCQL(entityClass, query);
			ResultSet dResultSet = cassandraTemplate.query(dCQL, options);
			if (dResultSet.all().size() == 0) continue;
			Entity entity = newEntityInstance();
			rowToEntityCommonProps(row, entity);
			String cql = "select * from " + dataTable() + " where thing_id = ?";
			ResultSet fResultSet = getSession().execute(cql, new Object[]{row.getLong("id")});
			resultSetToEntity(fResultSet, entity);
			entities.add(entity);
		}
		return new QueryResult<Entity>(entities.size(), entities);
	}
	
	private QueryResult<Entity> getAllByThingAndDataParams(Query query) {
		List<Entity> entities = new ArrayList<Entity>();
		String tCQL = CQLBuilder.buildThingCQL(entityClass, query);
		QueryOptions options = new QueryOptions();
		options.setConsistencyLevel(org.springframework.cassandra.core.ConsistencyLevel.ONE);
		options.setRetryPolicy(RetryPolicy.DEFAULT);
		ResultSet resultSet = cassandraTemplate.query(tCQL, options);
		Iterator<Row> iterator = resultSet.iterator();
		while (iterator.hasNext()) {
			Row row = iterator.next();
			query.addCondition("thingId", row.getLong("id"));
			List<String> cqls = CQLBuilder.buildMultiConditionDataCQL(entityClass, query);
			int hitCount = 0;
			for (String cql : cqls) {
				ResultSet dResultSet = cassandraTemplate.query(cql, options);
				if (dResultSet.all().size() == 0) continue;
				hitCount += 1;
			}
			if (cqls.size() != hitCount) continue;
			Entity entity = newEntityInstance();
			rowToEntityCommonProps(row, entity);
			String cql = "select * from " + dataTable() + " where thing_id = ?";
			ResultSet fResultSet = getSession().execute(cql, new Object[]{row.getLong("id")});
			resultSetToEntity(fResultSet, entity);
			entities.add(entity);
		}
		return new QueryResult<Entity>(entities.size(), entities);
	}
	
	@SuppressWarnings("unchecked")
	private List<Entity> getAllByDataParams(Map<String, Object> params) {
		List<Entity> entities = new ArrayList<Entity>();
		Set<ID> ids = new HashSet<ID>();
		Object[] dCQL = CQLBuilder.buildDataCQLAndParams(entityClass, params);
		ResultSet resultSet = getSession().execute(String.valueOf(dCQL[0]), (Object[]) dCQL[1]);
		Iterator<Row> iterator = resultSet.iterator();
		while (iterator.hasNext()) {
			Row row = iterator.next();
			ID id = (ID) new Long(row.getLong("thing_id"));
			if (ids.contains(id)) continue;
			ids.add(id);
			Entity entity = getOneById(id);
			if (null != entity) entities.add(entity);
		}
		return entities;
	}
	
	@SuppressWarnings({ "unchecked", "unused" })
	private QueryResult<Entity> getAllByOneDataParams(Query query) {
		Set<ID> ids = new HashSet<ID>();
		String dCQL = CQLBuilder.buildOneConditionDataCQL(entityClass, query);
		QueryOptions options = new QueryOptions();
		options.setConsistencyLevel(org.springframework.cassandra.core.ConsistencyLevel.ONE);
		options.setRetryPolicy(RetryPolicy.DEFAULT);
		ResultSet resultSet = cassandraTemplate.query(dCQL, options);
		Iterator<Row> iterator = resultSet.iterator();
		while (iterator.hasNext()) {
			Row row = iterator.next();
			ID id = (ID) new Long(row.getLong("thing_id"));
			if (ids.contains(id)) continue;
			ids.add(id);
		}
		List<Entity> entities = new ArrayList<Entity>();
		for (ID id : ids) {
			Entity entity = getOneById(id);
			if (null != entity) entities.add(entity);
		}
		return new QueryResult<Entity>(entities.size(), entities);
	}
	
	@SuppressWarnings("unchecked")
	private QueryResult<Entity> getAllByDataParams(Query query) {
		List<String> cqls = CQLBuilder.buildMultiConditionDataCQL(entityClass, query);
		List<Set<ID>> idList = new ArrayList<Set<ID>>();
		for (String cql : cqls) {
			Set<ID> ids = new HashSet<ID>();
			QueryOptions options = new QueryOptions();
			options.setConsistencyLevel(org.springframework.cassandra.core.ConsistencyLevel.ONE);
			options.setRetryPolicy(RetryPolicy.DEFAULT);
			ResultSet resultSet = cassandraTemplate.query(cql, options);
			Iterator<Row> iterator = resultSet.iterator();
			while (iterator.hasNext()) {
				Row row = iterator.next();
				ID id = (ID) new Long(row.getLong("thing_id"));
				if (ids.contains(id)) continue;
				ids.add(id);
			}
			idList.add(ids);
		}
		Set<ID> ids = new HashSet<ID>();
		if (idList.size() > 0) {
			for (ID id : idList.get(0)) {
				boolean flag = true;
				for (Set<ID> tmpIds : idList) {
					if (!tmpIds.contains(id)) {
						flag = false;
						break;
					}
				}
				if (flag) {
					ids.add(id);
				}
			}
		}
		List<Entity> entities = new ArrayList<Entity>();
		for (ID id : ids) {
			Entity entity = getOneById(id);
			if (null != entity) entities.add(entity);
		}
		return new QueryResult<Entity>(entities.size(), entities);
	}
	
	protected void rowToEntityCommonProps(Row row, Entity entity) {
		ThingUtils.setValueBySetMethod(entity, row.getLong("id"), "setId", Long.class);
		ThingUtils.setValueBySetMethod(entity, row.getInt("ups"), "setUps", Integer.class);
		ThingUtils.setValueBySetMethod(entity, row.getInt("downs"), "setDowns", Integer.class);
		ThingUtils.setValueBySetMethod(entity, row.getBool("deleted"), "setDeleted", Boolean.class);
		ThingUtils.setValueBySetMethod(entity, row.getBool("spam"), "setSpam", Boolean.class);
		ThingUtils.setValueBySetMethod(entity, row.getDate("date"), "setDate", Date.class);
	}
	
	protected void resultSetToEntity(ResultSet resultSet, Entity entity) {
		Map<String, Object> map = new HashMap<String, Object>();
		Iterator<Row> iterator = resultSet.iterator();
		while (iterator.hasNext()) {
			Row row = iterator.next();
			String kind = row.getString("kind");
			String value = row.getString("value");
			map.put(row.getString("key"), ThingUtils.getValueByKind(kind, value));
		}
		if (map.size() == 0) return;
		ThingUtils.convertMapToObject(map, entity);
	}
	
}

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.log4j.Logger;
import org.springframework.data.cassandra.mapping.Column;

public class ThingUtils {
	
	public static Logger LOG = Logger.getLogger(ThingUtils.class);
	
	private static Set<String> basicAttributes = null;
	
	static {
		basicAttributes = new HashSet<String>();
		Field[] fields = Thing.class.getDeclaredFields();
		try {
			for (Field field : fields) {
				String name = field.getName();
				if ("serialVersionUID".equalsIgnoreCase(name)) continue;
				basicAttributes.add(name);
			}
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
	}
	
	public static String thingTable(Class<?> clazz) {
		return "db_thing_" + clazz.getSimpleName().toLowerCase();
	}
	
	public static String dataTable(Class<?> clazz) {
		return "db_data_" + clazz.getSimpleName().toLowerCase();
	}
	
	public static boolean isBasicAttribute(String attribute) {
		return basicAttributes.contains(attribute) ? true : false;
	}
	
	public static Long extractId(Object object) {
		Field[] superFields = object.getClass().getSuperclass().getDeclaredFields();
		try {
			for (Field field : superFields) {
				String name = field.getName();
				if (!"id".equalsIgnoreCase(name)) continue;
				field.setAccessible(true);
				Object value = field.get(object);
				field.setAccessible(false);
				return (Long) value;
			}
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
		return null;
	}
	
	public static Thing extractThing(Object object) {
		Thing thing = new Thing();
		Field[] fields = Thing.class.getDeclaredFields();
		try {
			for (Field field : fields) {
				String name = field.getName();
				if ("serialVersionUID".equalsIgnoreCase(name)) continue;
				Field objField = object.getClass().getDeclaredField(name);
				if (null == objField) continue;
				objField.setAccessible(true);
				Object objValue = objField.get(object);
				objField.setAccessible(false);
				field.setAccessible(true);
				field.set(thing, objValue);
				field.setAccessible(false);
			}
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
		return thing;
	}
	
	public static List<ThingData> extractThingDatas(Object object) {
		List<ThingData> thingDatas = new ArrayList<ThingData>();
		Field[] fields = object.getClass().getDeclaredFields();
		try {
			ThingData thingData = null;
			for (Field field : fields) {
				String name = field.getName();
				if ("serialVersionUID".equalsIgnoreCase(name)) continue;
				field.setAccessible(true);
				Object value = field.get(object);
				field.setAccessible(false);
				if (null == value) continue;
				System.out.println(name + " : " + value);
				thingData = new ThingData();
				thingData.setThingId(extractId(object));
				thingData.setKey(name);
				thingData.setValue(String.valueOf(value));
				thingData.setKind(getKindByValue(value));
				thingDatas.add(thingData);
			}
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
		return thingDatas;
	}
	
	//flag true is column name false is field name
	public static Map<String, Object> convertObjectToMap(Object object, boolean flag) {
		Map<String, Object> map = new HashMap<String, Object>();
		Field[] fields = object.getClass().getDeclaredFields();
		try {
			for (Field field : fields) {
				String name = field.getName();
				if ("serialVersionUID".equalsIgnoreCase(name)) continue;
				field.setAccessible(true);
				Object value = field.get(object);
				field.setAccessible(false);
				if (null == value) continue;
				String key = name;
				if (flag) {
					String columnName = getColumnName(field);
					if (null != columnName) {
						key = columnName;
					}
				}
				System.out.println(key + " : " + value);
				map.put(key, value);
			}
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
		return map;
	}
	
	public static String getColumnName(Field field) {
		Column column = field.getAnnotation(Column.class);
		return null != column ? column.value() : null;
	}
	
	public static void convertMapToObject(Map<String, Object> map, Object object) {
		try {
			Method[] methods = object.getClass().getMethods();
			for (Method method : methods) {
				String mname = method.getName();
				if (!mname.startsWith("set")) continue;
				StringBuffer sb = new StringBuffer();
				sb.append(String.valueOf(mname.charAt(3)).toLowerCase());
				sb.append(mname.substring(4));
				String name = sb.toString();
				if (null != map.get(name)) {
					method.invoke(object, map.get(name));
				}
			}
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
	}
	
	public static Object dataToObject(Thing thing, List<ThingData> thingDatas, Class<?> entityClass) {
		Object entity = null;
		try {
			entity = entityClass.newInstance();
			Map<String, Object> tmap = convertObjectToMap(thing, false);
			for (ThingData thingData : thingDatas) {
				tmap.put(thingData.getKey(), thingData.getValue());
			}
			Method[] methods = entityClass.getMethods();
			for (Method method : methods) {
				String mname = method.getName();
				if (!mname.startsWith("set")) continue;
				StringBuffer sb = new StringBuffer();
				sb.append(String.valueOf(mname.charAt(3)).toLowerCase());
				sb.append(mname.substring(4));
				String name = sb.toString();
				if (null != tmap.get(name)) {
					method.invoke(entity, tmap.get(name));
				}
			}
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
		return entity;
	}
	
	public static void setValueBySetMethod(Object object, Object value, String methodName, Class<?>... parameterTypes) {
		try {
			object.getClass().getMethod(methodName, parameterTypes).invoke(object, value);
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
	}
	
	public static void setValuesBySetMethod(Object object, Object values[], String methodName, Class<?>... parameterTypes) {
		try {
			object.getClass().getMethod(methodName, parameterTypes).invoke(object, values);
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
	}
	
	public static Object getValueByMethodName(Object object, String methodName) {
		return getValueByMethodName(object, new Object[0], methodName, new Class[0]);
	}
	
	public static Object getValueByMethodName(Object object, Object values[], String methodName, Class<?>... parameterTypes) {
		Object returnValue = null;
		try {
			returnValue = object.getClass().getMethod(methodName, parameterTypes).invoke(object, values);
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} 
		return returnValue;
	}
	
	public static Object getValueByKind(String kind, String value) {
		Object finalValue = value;
		if (kind.equalsIgnoreCase(Kind.INTEGER.getName())) {
			finalValue = Integer.parseInt(value);
		} else if (kind.equalsIgnoreCase(Kind.LONG.getName())) {
			finalValue = Long.parseLong(value);
		} else if (kind.equalsIgnoreCase(Kind.FLOAT.getName())) {
			finalValue = Float.parseFloat(value);
		} else if (kind.equalsIgnoreCase(Kind.DOUBLE.getName())) {
			finalValue = Double.parseDouble(value);
		} else if (kind.equalsIgnoreCase(Kind.DATE.getName())) {
			try {
				finalValue = DateFormat.timeFormat.get().parse(value);
			} catch (ParseException e) {
				LOG.error(e.getMessage(), e);
			}
		}
		return finalValue;
	}
	
	public static String getKindByValue(Object value) {
		String kind = Kind.STRING.getName();
		if (value instanceof Integer) {
			kind = Kind.INTEGER.getName();
		} else if (value instanceof Long) {
			kind = Kind.LONG.getName();
		} else if (value instanceof Float) {
			kind = Kind.FLOAT.getName();
		} else if (value instanceof Double) {
			kind = Kind.DOUBLE.getName();
		} else if (value instanceof Date) {
			kind = Kind.DATE.getName();
		} 
		return kind;
	}
	
	public static void main(String[] args) throws Exception {
	}
	
}




www.htsjk.Com true http://www.htsjk.com/cassandra/26427.html NewsArticle Cassandra学习笔记-基本特性与API操作,cassandra学习笔记        Apache Cassandra 是一套 开源分布式Key-Value存储系统 。它最初由Facebook开发,用于储存特别大的数据。 主要特性: 分布式、...
相关文章
    暂无相关文章
评论暂时关闭