欢迎投稿

今日深度:

使用python-cassandra遇到的一个问题,

使用python-cassandra遇到的一个问题,


今天和运维在一起测试我写的一个程序时,在日志中发现了一个error,然而我写完程序后自测时是OK的,想起测试之前我拍着胸脯打包票说“我的程序肯定没问题”时,红扑扑的小脸没地儿搁了。

有问题那就解决!

先说下程序背景,我的程序的目的是从Cassandra集群中取数据,然后根据取得的数据生成一个配置文件供别的程序使用。程序结构还蛮简单的,用Python来写。

Cassandra集群有6台,这里就用编号1-6表示。连接的代码如下:


# 设置登陆Cassandra的用户、密码
auth = PlainTextAuthProvider(_config['username'], _config['password'])

cluster = Cluster(_config['addresses'], auth_provider = auth)

session = cluster.connect(_config['keyspace'])

具体使用cql语句的时候就像session.execute(cql)这样子使用。

好了,以上代码在自己的测试环境中跑毫无问题。

but,运维跑得时候就有问题了,日志如下:(这里只贴了关键日志)

2016-12-13 15:26:42,725 - cassandra.pool - DEBUG - 76 - create_file_for_mbw : Host 1 is now marked up
2016-12-13 15:26:42,725 - cassandra.pool - DEBUG - 76 - create_file_for_mbw : Host 2 is now marked up
2016-12-13 15:26:42,725 - cassandra.pool - DEBUG - 76 - create_file_for_mbw : Host 3 is now marked up
2016-12-13 15:26:42,725 - cassandra.pool - DEBUG - 76 - create_file_for_mbw : Host 4 is now marked up
2016-12-13 15:26:42,726 - cassandra.pool - DEBUG - 76 - create_file_for_mbw : Host 5 is now marked up
2016-12-13 15:26:42,725 - cassandra.pool - DEBUG - 76 - create_file_for_mbw : Host 6 is now marked up
2016-12-13 15:26:42,970 - cassandra.cluster - DEBUG - 76 - create_file_for_mbw : [control connection] Finished fetching ring info
######  到这里是收集完成Cassandra集群信息

2016-12-13 15:26:42,970 - cassandra.cluster - DEBUG - 76 - create_file_for_mbw : [control connection] Rebuilding token map due to topology changes
2016-12-13 15:26:43,077 - cassandra.cluster - DEBUG - 76 - create_file_for_mbw : Control connection created

######  开始初始化与集群中4、5Cassandra数据库的连接
2016-12-13 15:26:43,078 - cassandra.pool - DEBUG - 55 - thread : Initializing connection for host 5
2016-12-13 15:26:43,080 - cassandra.pool - DEBUG - 55 - thread : Initializing connection for host 4

######   在连接4、5Cassandra的时候完成用户认证
2016-12-13 15:26:43,253 - cassandra.connection - DEBUG - 378 - asyncorereactor : Connection <AsyncoreConnection(140278113775440) 10.1.5.205:9042> successfully authenticated
2016-12-13 15:26:43,265 - cassandra.connection - DEBUG - 378 - asyncorereactor : Connection <AsyncoreConnection(140278096170704) 10.1.5.204:9042> successfully authenticated

######   完成与4、5Cassandra的连接
2016-12-13 15:26:43,302 - cassandra.pool - DEBUG - 55 - thread : Finished initializing connection for host 5
2016-12-13 15:26:43,303 - cassandra.cluster - DEBUG - 55 - thread : Added pool for host 5 to session
2016-12-13 15:26:43,304 - cassandra.pool - DEBUG - 55 - thread : Finished initializing connection for host 4
2016-12-13 15:26:43,305 - cassandra.cluster - DEBUG - 55 - thread : Added pool for host 4 to session

######  准备连接6、1
2016-12-13 15:26:43,303 - cassandra.pool - DEBUG - 55 - thread : Initializing connection for host 6
2016-12-13 15:26:43,306 - cassandra.pool - DEBUG - 55 - thread : Initializing connection for host 1

######   我的程序执行日志。。。在这里会使用通过cluster.connect()连接后的session来执行cql语句。

######   我的程序执行完毕

######   关闭与集群的连接
2016-12-13 15:26:43,494 - cassandra.cluster - DEBUG - 24 - atexit : Shutting down Cluster Scheduler
2016-12-13 15:26:43,495 - cassandra.cluster - DEBUG - 24 - atexit : Shutting down control connection

######   关闭1、5、4、6的连接
2016-12-13 15:26:43,495 - cassandra.io.asyncorereactor - DEBUG - 316 - asyncorereactor : Closing connection (140278113772368) to 1
2016-12-13 15:26:43,495 - cassandra.io.asyncorereactor - DEBUG - 320 - asyncorereactor : Closed socket to 1
2016-12-13 15:26:43,496 - cassandra.io.asyncorereactor - DEBUG - 316 - asyncorereactor : Closing connection (140278113775440) to 5
2016-12-13 15:26:43,496 - cassandra.io.asyncorereactor - DEBUG - 320 - asyncorereactor : Closed socket to 5
2016-12-13 15:26:43,496 - cassandra.io.asyncorereactor - DEBUG - 316 - asyncorereactor : Closing connection (140278096170704) to 4
2016-12-13 15:26:43,497 - cassandra.io.asyncorereactor - DEBUG - 320 - asyncorereactor : Closed socket to 4
2016-12-13 15:26:48,306 - cassandra.io.asyncorereactor - DEBUG - 316 - asyncorereactor : Closing connection (140278095583888) to 6
2016-12-13 15:26:48,306 - cassandra.io.asyncorereactor - DEBUG - 320 - asyncorereactor : Closed socket to 6

###### 连接到6,在进行用户认证的时候报错,连接失败,OperationTimedOut
2016-12-13 15:26:48,306 - cassandra.connection - DEBUG - 324 - asyncorereactor : Connection to 6 was closed during the authentication process
2016-12-13 15:26:48,307 - cassandra.cluster - WARNING - 55 - thread : Failed to create connection pool for new host 6:
Traceback (most recent call last):
  File "cassandra/cluster.py", line 2300, in cassandra.cluster.Session.add_or_renew_pool.run_add_or_renew_pool (cassandra/cluster.c:40585)
    new_pool = HostConnection(host, distance, self)
  File "cassandra/pool.py", line 329, in cassandra.pool.HostConnection.__init__ (cassandra/pool.c:6245)
    self._connection = session.cluster.connection_factory(host.address)
  File "cassandra/cluster.py", line 1119, in cassandra.cluster.Cluster.connection_factory (cassandra/cluster.c:15085)
    return self.connection_class.factory(address, self.connect_timeout, *args, **kwargs)
  File "cassandra/connection.py", line 333, in cassandra.connection.Connection.factory (cassandra/connection.c:5790)
    raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout)
OperationTimedOut: errors=Timed out creating connection (5 seconds), last_host=None

######   准备连接1
2016-12-13 15:26:48,311 - cassandra.pool - DEBUG - 55 - thread : Initializing connection for host 3

######   连接到1,进行用户认证的时候报错,连接失败,OperationTimedOut
2016-12-13 15:26:48,312 - cassandra.connection - DEBUG - 324 - asyncorereactor : Connection to 1 was closed during the authentication process
2016-12-13 15:26:48,313 - cassandra.cluster - WARNING - 55 - thread : Failed to create connection pool for new host 1:
Traceback (most recent call last):
  File "cassandra/cluster.py", line 2300, in cassandra.cluster.Session.add_or_renew_pool.run_add_or_renew_pool (cassandra/cluster.c:40585)
    new_pool = HostConnection(host, distance, self)
  File "cassandra/pool.py", line 329, in cassandra.pool.HostConnection.__init__ (cassandra/pool.c:6245)
    self._connection = session.cluster.connection_factory(host.address)
  File "cassandra/cluster.py", line 1119, in cassandra.cluster.Cluster.connection_factory (cassandra/cluster.c:15085)
    return self.connection_class.factory(address, self.connect_timeout, *args, **kwargs)
  File "cassandra/connection.py", line 333, in cassandra.connection.Connection.factory (cassandra/connection.c:5790)
    raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout)
OperationTimedOut: errors=Timed out creating connection (5 seconds), last_host=None

根据以上的注释,看到这里应该很明白了,在拿到了4、5的连接,还没等所有连接完成,就开始执行我的程序了,我的程序执行完毕后cluster就开始shutdown了,而这时剩下的4个连接还没有连接完成,会继续走他的流程,去连接Cassandra,但是连接已经关闭,所以就报如上错误了。

对于这种问题,第一直觉应该是有配置项的,翻了源码(cluster__init__方法中的配置项),找了文档,没找到相关的,苦恼不已。最后在看到clusterconnect()方法时,看到这个方法是这样定义的:

def connect(self, keyspace=None, wait_for_all_pools=False)

wait_for_all_pools是什么鬼?往下继续翻。

session = self._new_session(keyspace)

if wait_for_all_pools:

    wait_futures(session._initial_connect_futures)
return session

感觉有戏了!

继续找wait_futures方法,在开头看到了

from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures

最后在concurrent.futures._base中找到了

def wait(fs, timeout=None, return_when=ALL_COMPLETED)

而这个方法的介绍是 **Wait for the futures in the given sequence to complete.
** 。恩,让我脸红的就是你了。

culster.connect()方法加上参数wait_for_all_pools=True,再让运维试了下,OK了!

wait_for_all_pools=True的设置是让集群中所有连接都连上后再返回一个session,而默认的做法是客户端只要与集群中有连接便返回session

心情舒畅~~

www.htsjk.Com true http://www.htsjk.com/cassandra/27845.html NewsArticle 使用python-cassandra遇到的一个问题, 今天和运维在一起测试我写的一个程序时,在日志中发现了一个error,然而我写完程序后自测时是OK的,想起测试之前我拍着胸脯打包票说“我的程序...
相关文章
    暂无相关文章
评论暂时关闭