python使用Zookeeper实现操作锁

今天做产品购买的时候看到之前的一些代码,核心代码如下

from kazoo.client import *
from gateway import config


class ZookeeperLock:
    def __init__(self, servers):
        self.zk = KazooClient(",".join(servers))
        last_exception = None
        for i in range(5):
            try:
                self.zk.start()
            except Exception as e:
                last_exception = e
        if last_exception is not None:
            raise last_exception

    def get_lock(self, path, identifier):
        return self.zk.Lock(path, identifier)

    @classmethod
    def instance(cls):
        if not hasattr(cls, '__instance__'):
            cls.__instance__ = ZookeeperLock(config.ZOOKEEPER_SERVERS)
        return cls.__instance__

# ...

lock = ZookeeperLock.instance().get_lock("/xxx/xxx", 'xxx')
lock.acquire()

# ...

lock.release()

在我查阅资料并问了了解Zookeeper的同事后明白了其中的作用。acquire其实是条件阻塞的函数,拿不到锁的话就会阻塞,直到拿锁的把锁释放掉才会执行下面的代码(我们这里是购买产品)。这样做的作用是防止多个地方同时购买,可能同时扣款写数据库导致数据不准确。release用来释放锁。

Zookeeper的python包kazoo文档地址 https://kazoo.readthedocs.io/

为了验证逻辑的准确性,我写了个脚本对zookeeper实现锁操作做了测试。

import os
import sys
import django
import time
import threading


def factorial(name, number):
    print("Task %s: Start ...." % (name))
    lock = ZookeeperLock.instance().get_lock("/xxx/xx", '123123')
    print("Task %s: Get Lock" % (name))
    lock.acquire()
    print("Task %s: Doing ..." % (name))
    for i in range(1, number + 1):
        time.sleep(1)
    lock.release()
    print("Task %s: Unlock" % (name))


if __name__ == "__main__":
    t1 = threading.Thread(target=factorial, args=("A", 5))
    t2 = threading.Thread(target=factorial, args=("B", 2))
    t1.start()
    t2.start()

用两个线程模拟两个操作,A 先拿到锁,所以在前5秒B一直在阻塞状态,等到A释放锁后,B才开始“Doing”。

Task A: Start ....
Task B: Start ....
Task A: Get Lock
Task B: Get Lock
Task A: Doing ...
# 这里了暂停5秒
Task A: Unlock
Task B: Doing ...
# 这里了暂停2秒
Task B: Unlock

经过一些了解,zookeeper就像一个更大的全局变量。接下来在深入研究一下。