使用 Redis 实现分布式系统轻量级协调技术(1)
在分布式系统中,各个进程(本文使用进程来描述分布式系统中的运行主体,它们可以在同一个物理节点上也可以在不同的物理节点上)相互之间通常是需要协调进行运作的,有时是不同进程所处理的数据有依赖关系,必须按照一定的次序进行处理,有时是在一些特定的时间需要某个进程处理某些事务等等,人们通常会使用分布式锁、选举算法等技术来协调各个进程之间的行为。因为分布式系统本身的复杂特性,以及对于容错性的要求,这些技术通常是重量级的,比如 Paxos 算法,欺负选举算法,ZooKeeper 等,侧重于消息的通信而不是共享内存,通常也是出了名的复杂和难以理解,当在具体的实现和实施中遇到问题时都是一个挑战。
Redis 经常被人们认为是一种 NoSQL 软件,但其本质上是一种分布式的数据结构服务器软件,提供了一个分布式的基于内存的数据结构存储服务。在实现上,仅使用一个线程来处理具体的内存数据结构,保证它的数据操作命令的原子特性;它同时还支持基于 Lua 的脚本,每个 Redis 实例使用同一个 Lua 解释器来解释运行 Lua 脚本,从而 Lua 脚本也具备了原子特性,这种原子操作的特性使得基于共享内存模式的分布式系统的协调方式成了可能,而且具备了很大的吸引力,和复杂的基于消息的机制不同,基于共享内存的模式对于很多技术人员来说明显容易理解的多,特别是那些已经了解多线程或多进程技术的人。在具体实践中,也并不是所有的分布式系统都像分布式数据库系统那样需要严格的模型的,而所使用的技术也不一定全部需要有坚实的理论基础和数学证明,这就使得基于 Redis 来实现分布式系统的协调技术具备了一定的实用价值,实际上,人们也已经进行了不少尝试。本文就其中的一些协调技术进行介绍。
signal/wait 操作
在分布式系统中,有些进程需要等待其它进程的状态的改变,或者通知其它进程自己的状态的改变,比如,进程之间有操作上的依赖次序时,就有进程需要等待,有进程需要发射信号通知等待的进程进行后续的操作,这些工作可以通过 Redis 的 Pub/Sub 系列命令来完成,比如:
- import redis, time
- rc = redis.Redis()
- def wait( wait_for ):
- ps = rc.pubsub()
- ps.subscribe( wait_for )
- ps.get_message()
- wait_msg = None
- while True:
- msg = ps.get_message()
- if msg and msg['type'] == 'message':
- wait_msg = msg
- break
- time.sleep(0.001)
- ps.close()
- return wait_msg
- def signal_broadcast( wait_in, data ):
- wait_count = rc.publish(wait_in, data)
- return wait_count
用这个方法很容易进行扩展实现其它的等待策略,比如 try wait,wait 超时,wait 多个信号时是要等待全部信号还是任意一个信号到达即可返回等等。因为 Redis 本身支持基于模式匹配的消息订阅(使用 psubscribe 命令),设置 wait 信号时也可以通过模式匹配的方式进行。
和其它的数据操作不同,订阅消息是即时易逝的,不在内存中保存,不进行持久化保存,如果客户端到服务端的连接断开的话也是不会重发的,但是在配置了 master/slave 节点的情况下,会把 publish 命令同步到 slave 节点上,这样我们就可以同时在 master 以及 slave 节点的连接上订阅某个频道,从而可以同时接收到发布者发布的消息,即使 master 在使用过程中出故障,或者到 master 的连接出了故障,我们仍然能够从 slave 节点获得订阅的消息,从而获得更好的鲁棒性。另外,因为数据不用写入磁盘,这种方法在性能上也是有优势的。
上面的方法中信号是广播的,所有在 wait 的进程都会收到信号,如果要将信号设置成单播,只允许其中一个收到信号,则可以通过约定频道名称模式的方式来实现,比如:
频道名称 = 频道名前缀 (channel) + 订阅者全局唯一 ID(myid)
其中唯一 ID 可以是 UUID,也可以是一个随机数字符串,确保全局唯一即可。在发送 signal 之前先使用“pubsub channels channel*”命令获得所有的订阅者订阅的频道,然后发送信号给其中一个随机指定的频道;等待的时候需要传递自己的唯一 ID,将频道名前缀和唯一 ID 合并为一个频道名称,然后同前面例子一样进行 wait。示例如下:
- import random
- single_cast_script="""
- local channels = redis.call('pubsub', 'channels', ARGV[1]..'*');
- if #channels == 0 then
- return 0;
- end;
- local index= math.mod(math.floor(tonumber(ARGV[2])), #channels) + 1;
- return redis.call( 'publish', channels[index], ARGV[3]);
- """
- def wait_single( channel, myid):
- return wait( channel + myid )
- def signal_single( channel, data):
- rand_num = int(random.random() * 65535)
- return rc.eval( single_cast_script, 0, channel, str(rand_num), str(data) )