分布式锁实现浅谈

前言

最近看了一些关于分布式锁的实现的一些文章,发现以前写的代码存在很多的错误,对分布式的锁的理解存在很多误区,在这记录一下分布式锁的几种正确的实现方式。
先通过一个场景,我们来探讨一下分布式锁的一些特性。场景是这样的,订单服务部署在两台机器上,用户保存订单的时候,需要实时校验商品的库存并扣减,当两个用户的保存订单的并发请求分别被分发到在这两台机器上时,我们必须保证保存订单和商品库存的扣减就是两个需要保持一致性的写操作。这个时候我们可以对库存这一关键性的分布式资源使用分布式锁。当然可能使用一些队列来处理请求,提升用户的体验,避免因锁的存在而使用户请求被拒绝。如果库存和订单又是两个不同的服务,那还可能需要消息中间件,分布式事务等处理,这里我们只讨论对单一的分布式资源使用锁的情况。
如果我们把目标放一台机器上,我们可能使用,synchronized或者lock之类的同步锁,针对每一个请求,都对保存和库存扣减加锁,操作完后释放锁。分布式锁也是一样的,但是我们必须考虑一些跟单点不一样的问题:

1
2
3
1.加解锁一致性.多台机器都可以加锁和释放锁,我们必须保证每一次加锁和释放锁都是同一台机器,不能一台机器加的锁被另一台机器解锁。那样当加锁的机器的业务操作未完成的时候,但锁被其他机器释放,另一个请求又进来,就会出现资源不一致的情况。
2.死锁。在复杂的生产环境里,我们如果解决了第一个问题的时候,可能加锁的机器在处理后续业务的时候突然宕机,锁没被释放,其他机器一直无法申请加锁,而发生死锁。
3.等待加锁期间的校验。当一台机器加锁之后,另外的机器在申请加锁失败后,尝试校验过期时间,根据过期时间重试。同时考虑减少申请加锁的操作,减轻redis的压力,所以可能需要在申请加锁之前有一个等待时间。

待着这些个问题,我们来看看如何实现分布式锁。传统实现分布式锁的方案一般是利用持久化数据库(如利用InnoDB行锁,或事务,主键或索引冲突),现在一般是利用redis,zookeeper等高效的分布式组件实现分布式锁。

redis分布式锁

对于redis实现分布式锁,我们可以使用increment命令实现一个加锁的原子操作。这里我们使用springboot先写一个简单的加解锁方法。
引入依赖,这里我使用Springboot版本为1.3.6.RELEASE:

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>

加锁操作:

1
2
3
4
5
6
7
8
public void create(String name) throws LockExistsException {
String key = keyForName(name);
if (cacheService.increment(key, 1L) > 1) {
cacheService.expire(name, expiry);
throw new LockExistsException(Constants.LOCK_EXISTS_EXCEPTION);
}
cacheService.expire(name, expiry);
}

释放锁

1
2
3
4
5
6
7
8
9
10
11
12
public boolean release(String name, String value) throws LockNotHeldException {
//获取锁值
String stored = getValue(name);
if (stored != null) {
//锁存在
String key = keyForName(name);
cacheService.remove(key);
log.info("释放锁");
return true;
}
return false;
}

这里当机器获不到锁时,直接抛异常,由业务层对异常处理。可以看到这里,键值对的value大于等于1.当为1时,表示加锁成功。并设置了锁的有效期,避免死锁发生。
但在解锁的时候,非原子操作,存在一种可能:业务处理时间太长,锁过期,其他机器已成功加锁,这个时候将释放其他机器加的锁。于是我们在加解锁中加入一个标示作为value,来区别不同机器加锁。要加入value值,就要操作键值对,键值对的原子操作我们首先想到的是setnx命令,setnx命令在往redis添加键值对的时候,如果当前设置的key已存在,不做任何操作,直接返回0,不存在则添加值并返回1,我们先简单的通过spring-boot-starter-redis实现的加锁操作。spring的api里selfAbsent实现了setnx命令。
加锁操作:

1
2
3
4
5
6
7
public void acquireLock(String name, String value) throws LockExistsException {
String key = keyForName(name);
if (!cacheService.selfAbsent(key, value)) {
throw new LockExistsException(Constants.LOCK_EXISTS_EXCEPTION);
}
cacheService.expire(name, expiry);
}

value为加锁机器标示,由业务层生成。
释放锁:

1
2
3
4
5
6
7
8
9
10
private static final String RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

public void releaseLock(String name, String value) throws LockExistsException {
List<String> keys = new ArrayList<>();
keys.add(keyForName(name));
if (0 == cacheService.executeLua(RELEASE_LOCK_LUA_SCRIPT, keys, value)) {
//解锁失败
throw new LockExistsException(Constants.LOCK_EXISTS_EXCEPTION);
}
}

为实现解锁的原子性,我们执行lua脚本的方式删除,lua脚本很简单,当通过key获取到的值等于传入的value,即检查加解锁的标示是否一致,表明是否是同一次请求的加解锁。这样我们满足了加解锁的一致性,同时避免了死锁的发生。优化后的代码可以解决我们开头引入的场景中订单的重复提交问题,我们可以把订单id作为锁的key处理。但是对于不同用户的请求,当一个请求操作库存的时候,另一个请求应该处于申请加锁的等待过程中。于是我们可以设置一个获取锁的超时时间,超过等待时间,再抛出异常。当然依旧无法满足高并发时的订单请求,这个可以采用消息队列的方式进行处理。这里我们只讨论如何解决开头提到的第三个问题。
第三个问题的解决方案实现要复杂一点,总体来说就是不断的重试来获取锁,为了减轻redis压力,在重试过程中,去订阅redis锁释放的消息,当获取到redis的消息,再次重试。这里我就不重复造轮子了。给出现有的解决方案:Redisson。
参考:Redisson 分布式锁实现分析
Redisson github

zookeeper分布式锁

使用zookeeper实现分布式锁,我们就不必在造轮子了,Apache Curator已经为我们提供了大量的zookeeper操作api,并解决了我们在操作zookeeper时的连接异常,session失效等问题。对于实现分布式锁,Curator已经提供了像InterProcessMutex可重入锁,InterProcessSemaphoreMutex不可重入锁等许多锁的解决方案。这里我只针对上面redis实现的重复提交的思路,用zookeeper来实现。
引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.11.1</version>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.1</version>
</dependency>

实现锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ZookeeperLockServiceImpl implements ZookeeperLockService{

private CuratorFramework client;

public ZookeeperLockServiceImpl(ZookeeperProfile zookeeperProfile){
client = CuratorFrameworkFactory.newClient(zookeeperProfile.getConnectString(), zookeeperProfile.getRetryPolicy());
client.start();
}

@Override
public boolean acquire(String keyPath, long time, TimeUnit unit) {
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client,keyPath);
try {
return lock.acquire(time,unit);
}catch (Exception e){

}
return false;
}

@Override
public void release(String keyPath) {
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client,keyPath);
try {
lock.release();
}catch (Exception e){

}
}
}

这里只是提供大概的思路,关于锁异常的处理,等待锁释放的监听,后续研究中。

坚持原创技术分享,您的支持将鼓励我继续创作!