Cassandra数据分布测试,cassandra数据分布
说明:dc=数据中心(datacenter),node=节点=服务器,基于Cassandra1.2.0版本Cassandra集群环境是:4个节点,2个数据中心(dc1,dc2),每个dc2个节点
Cassandra客户端:Hector(hector-core-1.1-2)
意义:进一步了解Cassandra的数据分布、副本策略和验证Hector客户端的使用
keyspace配置:
CREATE KEYSPACE DTD
with placement_strategy = 'org.apache.cassandra.locator.NetworkTopologyStrategy'
and strategy_options = {'DC1' : 1 , 'DC2' : 1} #每个dc只有1份数据
AND durable_writes = true;
ColumnFamily配置:
create column family TSG with comparator=UTF8Type
AND key_validation_class=UTF8Type
AND default_validation_class = UTF8Type
and column_metadata=[
{column_name: GNAME, validation_class: UTF8Type},
{column_name: PLAYNAME, validation_class: UTF8Type},
{column_name: PRIZEDATE, validation_class: UTF8Type}
]
with read_repair_chance=0.1
and dclocal_read_repair_chance=0.5;
场景1、客户端设置ConsistencyLevel.ONE的insert测试,
前提1:停止dc2的两个节点,对dc1执行insert。
验证1.1.1:分别停止dc1的一个节点进行查询,刚插入的数据只能在其中一个节点上查到。
验证1.1.2:dc1的节点和dc2的节点都起来,一段时候后(>30s),dc2也存在刚插入dc1的数据,验证了集群之间节点通过gossip更新数据。
验证1.1.3:停止dc1中的节点,再分别停止dc2中一个节点,可验证刚插入的数据只存在其中一个节点上。
另外,如果dc2停止,data1存储在dc1中node1上,node1也停止了,查node2的会抛HUnavaliableException。
场景2、客户端设置ConsistencyLevel.QUORUM的insert测试
前提1:集群活动节点数 >= ConsistencyLevel.QUORUM,且至少有2个dc存在活动节点,执行insert(如果只有1个数据中心会有异常,待继续验证)。
验证2.1.1:能在每个dc中查到insert的数据
另外,如果data1存储在dc1-node1和dc2-node3上,此时停止node3,且满足集群活动节点数 >= ConsistencyLevel.QUORUM,查询仍然会抛出HUnavaliableException。
前提2:集群活动节点数 < ConsistencyLevel.QUORUM,执行insert。
验证2.2.1:insert数据失败,2个dc中都不会存在insert数据。
场景3:客户端设置ConsistencyLevel.ONE的delete测试,
前提1:停止dc2的两个节点,对dc1执行delete。
验证3.1.1:dc1数据删除成功。
前提2:启动dc2的两个节点
验证3.2.1:一段时候后(>30s),dc2数据删除成功。
场景4:客户端设置ConsistencyLevel.QUORUM的delete测试,
前提1:集群活动节点数 >= ConsistencyLevel.QUORUM,执行delete。
验证4.1.1:delete成功。
前提2:集群活动节点数 < ConsistencyLevel.QUORUM,执行delete。
验证4.2.1:delete数据失败。
Cassandra集群环境是:4个节点,2个数据中心(dc1,dc2),每个dc2个节点。每1份数据有2个副本的测试。
keyspace配置:
CREATE KEYSPACE DTD
with placement_strategy = 'org.apache.cassandra.locator.NetworkTopologyStrategy'
and strategy_options = {'DC1' : 2 , 'DC2' : 2} #每个dc有2份数据
AND durable_writes = true;
场景5:
前提1:dc2的2个节点停止,insert成功。然后分别停止dc1中1个节点
验证5.1.1:数据能在dc1中两个节点中查询到,证明dc1中存在2个副本。
前提2:dc1中有活动节点,启动dc2的2个节点
验证5.2.1:能分别在dc2中的节点中查到数据,证明了集群之间节点通过gossip更新数据,并且dc2中存在数据的2个副本。
验证5.3.1:只存在一个活动节点的情况下,删除此活动节点的数据,随后启动其它节点,一段时间后,其它节点数据也会删除。
下面是测试代码:
public class BetSample {
final static Cluster cluster = HFactory.getOrCreateCluster("Test Cluster",
new CassandraHostConfigurator("192.168.3.141:9160"));
private static Keyspace keyspace = null;
private static ColumnFamilyTemplate<String, String> template = null;
final static StringSerializer ss = StringSerializer.get();
final static LongSerializer ls = LongSerializer.get();
final static IntegerSerializer is = IntegerSerializer.get();
final static UUIDSerializer us = UUIDSerializer.get();
final static String columnFamily = "BET";
/**
* Save Tweet in Cassandra
*
* @param tweet Tweet to be saved
*/
public void saveBet(Bet bet) {
Mutator<String> m1 = HFactory.createMutator(keyspace, ss);
m1.addInsertion(bet.getKey(), columnFamily, HFactory.createColumn("SN", bet.getSn(), ss, is))
.addInsertion(bet.getKey(), columnFamily, HFactory.createColumn("MULTITIMES", bet.getMultiTimes(), ss, is))
.addInsertion(bet.getKey(), columnFamily, HFactory.createColumn("WINCOUNT", bet.getWinCount(), ss, is))
.addInsertion(bet.getKey(), columnFamily, HFactory.createStringColumn("TICKETID", bet.getTicketId()))
.addInsertion(bet.getKey(), columnFamily, HFactory.createStringColumn("BETNUM", bet.getBetNum()))
.addInsertion(bet.getKey(), columnFamily, HFactory.createStringColumn("PRIZE", bet.getPrize()))
.addInsertion(bet.getKey(), columnFamily, HFactory.createStringColumn("PLAYTYPEID", bet.getPlayTypeId()))
.addInsertion(bet.getKey(), columnFamily, HFactory.createColumn("BETTAX", bet.getBetTax(), ss, ls))
.addInsertion(bet.getKey(), columnFamily, HFactory.createStringColumn("WINLEVEL", bet.getWinLevel()));
m1.execute();
}
public void selectByPrimaryKey(String key){
ColumnFamilyResult<String, String> result = template.queryColumns(key);
System.out.println("--------------" + result);
}
public void delete(String key){
template.deleteRow(key);
System.out.println("delete [" + key + "] down.");
}
public static void main(String[] args) {
ConfigurableConsistencyLevel cl = new ConfigurableConsistencyLevel();
Map<String, HConsistencyLevel> clmap = new HashMap<String, HConsistencyLevel>();
// clmap.put(columnFamily, HConsistencyLevel.QUORUM);
// cl.setDefaultReadConsistencyLevel(HConsistencyLevel.QUORUM);
// cl.setDefaultWriteConsistencyLevel(HConsistencyLevel.QUORUM);
// clmap.put(columnFamily, HConsistencyLevel.ALL);
// cl.setDefaultReadConsistencyLevel(HConsistencyLevel.ALL);
// cl.setDefaultWriteConsistencyLevel(HConsistencyLevel.ALL);
// clmap.put(columnFamily, HConsistencyLevel.LOCAL_QUORUM);
// cl.setDefaultReadConsistencyLevel(HConsistencyLevel.LOCAL_QUORUM);
// cl.setDefaultWriteConsistencyLevel(HConsistencyLevel.LOCAL_QUORUM);
clmap.put(columnFamily, HConsistencyLevel.TWO);
cl.setDefaultReadConsistencyLevel(HConsistencyLevel.TWO);
cl.setDefaultWriteConsistencyLevel(HConsistencyLevel.TWO);
// clmap.put(columnFamily, HConsistencyLevel.ONE);
// cl.setDefaultReadConsistencyLevel(HConsistencyLevel.ONE);
// cl.setDefaultWriteConsistencyLevel(HConsistencyLevel.ONE);
cl.setReadCfConsistencyLevels(clmap);
cl.setWriteCfConsistencyLevels(clmap);
keyspace = HFactory.createKeyspace("gxsim", cluster, cl);
template = new ThriftColumnFamilyTemplate<String, String>(keyspace, columnFamily, ss, ss);
BetSample sample = new BetSample();
sample.save();
// sample.selectByPrimaryKey("836fcc5f-29f2-4013-b219-a8cd569ad1fe");
// sample.delete("970bf9b6-b9a5-4e58-8926-9252d78c1a18");
// sample.select();
cluster.getConnectionManager().shutdown();
}
public void save(){
int count = 1;
System.out.println("Saving bet ....");
for (int i = 0; i < count; i++) {
Bet bet = new Bet(UUID.randomUUID().toString(),"ticket"+i, i, "playtype"+i, i, ""+i, "" , i, ""+i, (long)i);
System.out.println("Saving bet # : "+ bet);
saveBet(bet);
}
}
Cassandra数据分布系列文章:
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。