欢迎投稿

今日深度:

cassandra java_Cassandra安装与Java操作,经过查阅资料和比较后

cassandra java_Cassandra安装与Java操作,经过查阅资料和比较后


由于项目需要高并发的写入数据操作,原有的关系型数据库很难满足这部分的数据要求。经过查阅资料和比较后准备采用Cassandra来解决高写入的问题。

同时由于Cassandra可以和Hadoop很好的集成,为以后的在线分析也做了比较好的铺垫。

下面就是我做的一些测试。

1、获取Cassandra介质

下载并解压缩apache-cassandra-1.1.2-bin.tar.gz

2、运行配置Cassandra

执行以下指令启动。

./cassandra -f

f的意思是前台运行,可以随时Ctrl+C终止。

java.io.IOError: java.io.IOException: unable to mkdirs /var/lib/cassandra/data/system/schema_columnfamilies

会出错,因为这个用户没有创建/var/lib/cassandra/的权限(也可以给用户增加/var/lib的权限)

需要修改cassandra.yaml中的以下配置的路径

data_file_directories:

- /Nautilus/cassandra/data

commitlog_directory: /Nautilus/cassandra/commitlog

saved_caches_directory: /Nautilus/cassandra/saved_caches

修改log4j-server.properties中指定的log路径

log4j.appender.R.File=/Nautilus/cassandra/system.log

3、使用cassandra-cli测试

格式:bin/cassandra-cli -h host -p rpc_port

这里我们执行以下指令

./cassandra-cli -h 127.0.0.1 -p 9160

Connected to: "Test Cluster" on 127.0.0.1/9160

Welcome to Cassandra CLI version 1.1.2

Type 'help;' or '?' for help.

Type 'quit;' or 'exit;' to quit.

[default@unknown]

成功登录cli

4、创建测试keyspace

创建一个叫NOAH的keyspace

[default@unknown] create keyspace NOAH;

5a7a3b46-c006-3a22-9c84-e3b50662c8fa

Waiting for schema agreement...

... schemas agree across the cluster

使用NOAH的keyspace

[default@unknown] use NOAH;

Authenticated to keyspace: NOAH

[default@NOAH]

5、创建测试column family

创建一个叫Users的column family

[default@NOAH] create column family Users;

e6ae2c83-0f71-3882-be08-0275b894f261

Waiting for schema agreement...

... schemas agree across the cluster

6、存入一些测试数据到Users

[default@NOAH] set Users[utf8('1234')][utf8('name')] = utf8('张三');

Value inserted.

Elapsed time: 62 msec(s)

[default@NOAH] set Users[utf8('1234')][utf8('password')] = utf8('abcd');

Value inserted.

Elapsed time: 8 msec(s).

7、查询插入的测试数据

[default@NOAH] get Users[utf8('1234')];

=> (column=6e616d65, value=张三, timestamp=1344228470688000)

=> (column=70617373776f7264, value=abcd, timestamp=1344228523417000)

Returned 2 results.

Elapsed time: 53 msec(s).

我们可以观察到列名并没有显示为实际的字符串,需要使用assume指令

[default@NOAH] assume Users keys as utf8;

Assumption for column family 'Users' added successfully.

[default@NOAH] assume Users comparator as utf8;

Assumption for column family 'Users' added successfully.

[default@NOAH] assume Users validator as utf8;

Assumption for column family 'Users' added successfully.

[default@NOAH] get Users[utf8('1234')];

=> (column=name, value=张三, timestamp=1344228470688000)

=> (column=password, value=abcd, timestamp=1344228523417000)

Returned 2 results.

Elapsed time: 3 msec(s)

8、java操作cassandra(thrift)

源文件

package org.noahx.cassandra;

import org.apache.cassandra.thrift.*;

import org.apache.thrift.TException;

import org.apache.thrift.protocol.TBinaryProtocol;

import org.apache.thrift.protocol.TProtocol;

import org.apache.thrift.transport.TFramedTransport;

import org.apache.thrift.transport.TSocket;

import org.apache.thrift.transport.TTransport;

import java.io.UnsupportedEncodingException;

import java.nio.ByteBuffer;

import java.util.List;

/**

* Created with IntelliJ IDEA.

* User: noah

* Date: 8/6/12

* Time: 1:11 PM

* To change this template use File | Settings | File Templates.

*/

public class TestClient {

public static void main(String[] args)

throws TException, InvalidRequestException, UnavailableException, UnsupportedEncodingException, NotFoundException, TimedOutException {

TTransport tr = new TFramedTransport(new TSocket("localhost", 9160));

TProtocol proto = new TBinaryProtocol(tr);

Cassandra.Client client = new Cassandra.Client(proto);

tr.open();

String key_user_id = "1234"; //row id

client.set_keyspace("NOAH"); //指定keyspace

ColumnParent parent = new ColumnParent("Users");

ColumnPath path = new ColumnPath("Users");

// 读取单列内容

path.setColumn(toByteBuffer("name"));

ColumnOrSuperColumn col = (client.get(toByteBuffer(key_user_id), path, ConsistencyLevel.ONE));

System.out.println(toString(col.column.name) + " -> " + toString(col.column.value));

// 读取全部列内容

SlicePredicate predicate = new SlicePredicate();

SliceRange sliceRange = new SliceRange(toByteBuffer(""), toByteBuffer(""), false, 10);

predicate.setSlice_range(sliceRange);

List results = client.get_slice(toByteBuffer(key_user_id), parent, predicate, ConsistencyLevel.ONE);

for (ColumnOrSuperColumn result : results) {

Column column = result.column;

System.out.println(toString(column.name) + " -> " + toString(column.value));

}

// 向数据库中插入数据

long currentTimeMillis = System.currentTimeMillis();

long nanoTime = System.nanoTime();

long currentTimeMicros = currentTimeMillis * 1000 + nanoTime / 1000 - (nanoTime > 1000000 ? (nanoTime / 1000000) * 1000 : 0);

//注意取microseconds,而不是milliseconds,因为其它客户机都是microseconds。

//如果使用System.currentTimeMillis()导致你的时间永远小于其它客户机的时间,使其操作会被忽略。

System.out.println(currentTimeMicros);

String new_key_user_id = "123456"; //row id

Column nameColumn = new Column(toByteBuffer("name"));

nameColumn.setValue(toByteBuffer("Chris Goffinet"));

nameColumn.setTimestamp(currentTimeMicros);

client.insert(toByteBuffer(new_key_user_id), parent, nameColumn, ConsistencyLevel.ONE);

Column pwdColumn = new Column(toByteBuffer("password"));

pwdColumn.setValue(toByteBuffer("12345678"));

pwdColumn.setTimestamp(currentTimeMicros);

client.insert(toByteBuffer(new_key_user_id), parent, pwdColumn, ConsistencyLevel.ONE);

tr.flush();

tr.close();

}

public static ByteBuffer toByteBuffer(String value)

throws UnsupportedEncodingException {

return ByteBuffer.wrap(value.getBytes("UTF-8"));

}

public static String toString(ByteBuffer buffer)

throws UnsupportedEncodingException {

byte[] bytes = new byte[buffer.remaining()];

buffer.get(bytes);

return new String(bytes, "UTF-8");

}

}

控制台输出结果

name -> 张三

name -> 张三

password -> abcd

1344233850099866

在cli中检查新插入的数据

[default@NOAH] get Users[utf8('123456')];

=> (column=name, value=Chris Goffinet, timestamp=1344233850099866)

=> (column=password, value=12345678, timestamp=1344233850099866)

Returned 2 results.

Elapsed time: 3 msec(s).

maven配置

org.apache.cassandra

cassandra-thrift

1.1.3

9、java操作cassandra(hector)

hector提供了更易用的操作api

源文件

package org.noahx.cassandra;

import me.prettyprint.cassandra.serializers.StringSerializer;

import me.prettyprint.hector.api.Cluster;

import me.prettyprint.hector.api.Keyspace;

import me.prettyprint.hector.api.beans.OrderedRows;

import me.prettyprint.hector.api.beans.Row;

import me.prettyprint.hector.api.factory.HFactory;

import me.prettyprint.hector.api.query.QueryResult;

import me.prettyprint.hector.api.query.RangeSlicesQuery;

import java.util.Iterator;

/**

* Created with IntelliJ IDEA.

* User: noah

* Date: 8/6/12

* Time: 5:25 PM

* To change this template use File | Settings | File Templates.

*/

public class TestClient2 {

public static void main(String[] args) {

Cluster cluster = HFactory.getOrCreateCluster("Test Cluster", "localhost:9160");

Keyspace keyspace = HFactory.createKeyspace("NOAH", cluster);

RangeSlicesQuery rangeSlicesQuery = HFactory

.createRangeSlicesQuery(keyspace, StringSerializer.get(), StringSerializer.get(), StringSerializer.get())

.setColumnFamily("Users")

.setRange(null, null, false, 10);

rangeSlicesQuery.setKeys(null, null);

QueryResult> result = rangeSlicesQuery.execute();

OrderedRows rows = result.get();

Iterator> rowsIterator = rows.iterator();

while (rowsIterator.hasNext()) {

Row row = rowsIterator.next();

if (row.getColumnSlice().getColumns().isEmpty()) {

continue;

}

System.out.println(row);

}

}

}

控制台输出结果

22 [main] INFO me.prettyprint.cassandra.connection.CassandraHostRetryService - Downed Host Retry service started with queue size -1 and retry delay 10s

141 [main] INFO me.prettyprint.cassandra.service.JmxMonitor - Registering JMX me.prettyprint.cassandra.service_Test Cluster:ServiceType=hector,MonitorType=hector

Row(123456,ColumnSlice([HColumn(name=Chris Goffinet), HColumn(password=12345678)]))

Row(1234,ColumnSlice([HColumn(name=张三), HColumn(password=abcd)]))

maven配置

org.hectorclient

hector-core

1.1-1

www.htsjk.Com true http://www.htsjk.com/cassandra/45908.html NewsArticle cassandra java_Cassandra安装与Java操作,经过查阅资料和比较后 由于项目需要高并发的写入数据操作原有的关系型数据库很难满足这部分的数据要求。经过查阅资料和比较后准备采用Cassand...
相关文章
    暂无相关文章
评论暂时关闭