欢迎投稿

今日深度:

Cassandra driver 入门教程 (二) 异步调用连接数据库,cassandradriver

Cassandra driver 入门教程 (二) 异步调用连接数据库,cassandradriver


1 第一步 获取Cluster对象

Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();


2 第二步 获得ListenableFuture的Session

ListenableFuture<Session> session = cluster.connectAsync();


3 第三步 执行异步查询

ListenableFuture<ResultSet> rs = Futures.transform(session, new AsyncFunction<Session, ResultSet>(){

			@Override
			public ListenableFuture<ResultSet> apply(Session session) throws Exception {
				return session.executeAsync("select release_version from System.local");
			}
			
		});
		
这里是等session准备好以后,执行AsyncFunction生成新的异步结果集;

4 同步遍历结果集数据

ListenableFuture<String> version = Futures.transform(rs, new Function<ResultSet,String>(){

			@Override
			public String apply(ResultSet rs) {
				return rs.one().getString("release_version");
			}
			
		});

5 执行完成后的收尾工作(非常重要)

Futures.addCallback(version, new FutureCallback<String>(){

			@Override
			public void onSuccess(String result) {
				LOG.info("cassandra release version is: "+result);
				//这里很重要,记得异步关闭
				cluster.closeAsync();
			}

			@Override
			public void onFailure(Throwable t) {
				LOG.info("error: get release version, "+t.getMessage());
				cluster.closeAsync();
			}
			
		});


总体代码如下:

public static void asyncConnect(){
		Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
		
		ListenableFuture<Session> session = cluster.connectAsync();
		
		ListenableFuture<ResultSet> rs = Futures.transform(session, new AsyncFunction<Session, ResultSet>(){

			@Override
			public ListenableFuture<ResultSet> apply(Session session) throws Exception {
				return session.executeAsync("select release_version from System.local");
			}
			
		});
		
		ListenableFuture<String> version = Futures.transform(rs, new Function<ResultSet,String>(){

			@Override
			public String apply(ResultSet rs) {
				return rs.one().getString("release_version");
			}
			
		});
		
		Futures.addCallback(version, new FutureCallback<String>(){

			@Override
			public void onSuccess(String result) {
				LOG.info("cassandra release version is: "+result);
				//这里很重要,记得异步关闭
				cluster.closeAsync();
			}

			@Override
			public void onFailure(Throwable t) {
				LOG.info("error: get release version, "+t.getMessage());
				cluster.closeAsync();
			}
			
		});
	}



www.htsjk.Com true http://www.htsjk.com/cassandra/25718.html NewsArticle Cassandra driver 入门教程 (二) 异步调用连接数据库,cassandradriver 1 第一步 获取Cluster对象 Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); 2 第二步 获得ListenableFuture的Session Lis...
相关文章
    暂无相关文章
评论暂时关闭