帮我实现分布式锁的测试代码
时间: 2023-07-11 10:29:42 浏览: 50
好的,下面是一个简单的分布式锁测试代码,使用 Redis 实现分布式锁:
```
import redis
import time
import uuid
class RedisLock:
def __init__(self, key, timeout=10, sleep=0.1, redis_pool=None):
self.key = key
self.timeout = timeout
self.sleep = sleep
self.redis_pool = redis_pool or redis.ConnectionPool(host='localhost', port=6379, db=0)
self.lock_id = None
def acquire(self):
conn = redis.Redis(connection_pool=self.redis_pool)
while self.timeout > 0:
self.lock_id = str(uuid.uuid4())
if conn.set(self.key, self.lock_id, nx=True, ex=self.timeout):
return True
self.timeout -= self.sleep
time.sleep(self.sleep)
return False
def release(self):
conn = redis.Redis(connection_pool=self.redis_pool)
script = """
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
"""
conn.eval(script, 1, self.key, self.lock_id)
# 测试代码
def test_lock():
redis_pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
lock = RedisLock('test_lock', redis_pool=redis_pool)
if lock.acquire():
print("获得锁")
time.sleep(5)
lock.release()
print("释放锁")
else:
print("获取锁失败")
if __name__ == '__main__':
test_lock()
```
在上面的代码中,我们使用 Redis 实现了一个分布式锁,其中 `RedisLock` 类封装了获取和释放锁的方法,通过调用 `acquire()` 方法获取锁,调用 `release()` 方法释放锁。
在 `test_lock()` 函数中,我们创建了一个 `RedisLock` 对象,并尝试获取锁。如果获取成功,我们打印“获得锁”并等待 5 秒钟,然后释放锁;如果获取失败,我们打印“获取锁失败”。
您可以在多个客户端上运行此代码,测试分布式锁的效果。