0
点赞
收藏
分享

微信扫一扫

【分布式】Redis分布式锁代码实现Demo

zmhc 2022-11-23 阅读 92


分布式锁实现要素

分布式锁实现注意几个要素:

  • 加锁过程原子性:加锁时首先判断key是否存在、是否有值,没有值再设置,这3个步骤需要是原子操作;
  • 锁正常释放:出现超时、网络等问题时,保证加的锁可以正常释放;
  • 锁正确释放:锁A释放时,需要保证只能由加A锁的客户端释放,否则可能就会出现误删锁;
  • 锁高可用保证:分布式锁如Redis或ZK出现宕机时,如何保证加锁功能不被影响,需要根据业务考虑到CP、CA抉择选择合适的实现方式;同时可以添加兜底逻辑,如使用CA模型的ZK分布式锁时,当出现分布式不可用时,可以退化成本地锁或MySQL实现的分布式锁等。

Redis分布式锁代码示例

下面代码实现了Redis分布式锁加锁、解锁的过程:

public interface DistributedLock {
Lock obtainLock(String key);
}

public class RedisDistributedLock implements DistributedLock {

private static final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);
/**
* Get redis lock script.
* param1 - KEYS[1]: redisKey
* params2 - ARGV[1]: clientId
* params3 - ARGV[2]: expire time
*
*/
private static final String OBTAIN_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
" return true\n" +
"elseif not lockClientId then\n" +
" redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
" return true\n" +
"end\n" +
"return false";
private final String clientId = UUID.randomUUID().toString();
private final long expireTime;
private final Map<String, RedisLocalLock> locks = new ConcurrentHashMap<>();
private final StringRedisTemplate redisTemplate;
private final RedisScript<Boolean> redisScript;
private Executor executorService = Executors.newFixedThreadPool(2, new CustomerThreadFactory("Redis-Lock-Executor"));

static class CustomerThreadFactory implements ThreadFactory {

private String name;

public CustomerThreadFactory(String name) {
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
return new Thread();
}
}

/**
* Prefix path of a redis key, usually use project name.
* eg: in a order service(driver-order),
* order pay redis key like this: driver-order:order:pay:13932032323288
* order cancel redis key like this: driver-order:order:cancel:13289328392932
*/
private final String registerPath;

public RedisDistributedLock(RedisConnectionFactory connectionFactory, String registerPath) {
this(connectionFactory, registerPath, 0);
}

public RedisDistributedLock(RedisConnectionFactory connectionFactory, String registerPath, long expireTime) {
this.redisTemplate = new StringRedisTemplate(connectionFactory);
this.registerPath = registerPath;
this.redisScript = new DefaultRedisScript(OBTAIN_LOCK_SCRIPT, Boolean.class);
this.expireTime = expireTime;
}

public void setExecutor(Executor executor) {
this.executorService = executor;
}

@Override
public Lock obtainLock(String key) {
Assert.notNull(key != null, "key should not null.");
return this.locks.computeIfAbsent(key, RedisLocalLock::new);
}

private final class RedisLocalLock implements Lock {
private String lockKey;

/**
* Local lock, implement re-entry lock and deduct the pressure of redis.
*/
private final ReentrantLock localLock = new ReentrantLock();

/**
* Record the time on acquire lock to calculate the lock is expire.
*/
private volatile long lockAcquireTime;

public RedisLocalLock(String lockKey) {
this.lockKey = RedisDistributedLock.this.registerPath + ":" + lockKey;
}

/**
* Assumed thread-A obtain lock, thread-B try acquire lock will wait
* and cannot be interrupted, if use thread-B.interrupt() method, will wait until it acquired lock.
*/
@Override
public void lock() {
// local lock
localLock.lock();
while (true) {
try {
// redis lock
while (!obtainLock()) {
Thread.sleep(100);
}

// acquired lock then break while
break;
} catch (InterruptedException e) {
//..
} catch (Exception e) {
this.localLock.unlock();
throw new RuntimeException("Cannot acquire lock " + this.lockKey + ", cause=" +e.getMessage());
}
}
}

private boolean obtainLock() {
boolean success = RedisDistributedLock.this.redisTemplate.execute(RedisDistributedLock.this.redisScript,
Collections.singletonList(this.lockKey),
RedisDistributedLock.this.clientId,
String.valueOf(RedisDistributedLock.this.expireTime));
if (success) {
lockAcquireTime = System.currentTimeMillis();
}
return success;
}

/**
* Assumed thread-A obtain lock,
* while thread-B try acquiring lock, if use threadB.interrupter, will occur InterruptedException,
* then unlock will throws java.lang.IllegalMonitorStateException because the lock is not held by threadB.
*
* @throws InterruptedException
*/
@Override
public void lockInterruptibly() throws InterruptedException {
this.localLock.lockInterruptibly();
try {
while (!obtainLock()) {
Thread.sleep(100);
}
} catch (InterruptedException e) {
this.localLock.unlock();
Thread.currentThread().interrupt();
throw e;
} catch (Exception e) {
this.localLock.unlock();
throw new RuntimeException("lock interruptibly exception: " + e.toString());
}

}

/**
* If lock is holding by thread-A, thread-B try acquire lock will return false, else return true.
* @return
*/
@Override
public boolean tryLock() {
try {
return this.tryLock(0, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.interrupted();
return false;
}
}

/**
* If lock is holding by thread-A,
* thread-B will try acquire lock while whole unit time.
*
* @param time
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
boolean lockSuccess = this.localLock.tryLock(time, unit);
if (!lockSuccess) {
return false;
}

long now = System.currentTimeMillis();
long expire = now + TimeUnit.SECONDS.convert(time, unit);
try {
boolean acquired;
while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) {
Thread.sleep(100);
}
// if in time can't acquire lock, then unlock
if (!acquired) {
localLock.unlock();
}
return acquired;

} catch (Exception e) {
this.localLock.unlock();
throw new RuntimeException(e.toString());
}

}

/**
* lock unlock.
*
*/
@Override
public void unlock() {
if (!localLock.isHeldByCurrentThread()) {
throw new IllegalStateException("Lock is not held by current thread");
}

// if lock hold count > 1, local lock -1.
if (localLock.getHoldCount() > 1) {
localLock.unlock();
return;
}

try {
if (Thread.currentThread().isInterrupted()) {
executorService.execute(() -> RedisDistributedLock.this.redisTemplate.delete(lockKey));
} else {
RedisDistributedLock.this.redisTemplate.delete(lockKey);
}
logger.info("Success delete lockKey:{}", lockKey);

} catch (Exception e) {
throw new RuntimeException(e.toString());
} finally {
this.localLock.unlock();
}

}

@Override
public Condition newCondition() {
throw new UnsupportedOperationException("Condition is not supported.");
}
}

}


举报

相关推荐

Redis与分布式-分布式锁

0 条评论