0
点赞
收藏
分享

微信扫一扫

redisson支持高并发的RBucket

1.引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

2.配置类创建bean

/**
 * @author qujingye
 * @Classname RedissonConfig
 * @Description TODO
 * @Date 2023/11/16 16:27
 */
@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password}")
    private String redisPassword;



    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        //单机模式  依次设置redis地址和密码
        config.useSingleServer().
                setAddress("redis://" + host + ":" + port).
                setPassword(redisPassword);
        return Redisson.create(config);
    }
}

3.yml配置

redis:
  database: 0
  host: localhost
  lettuce:
      pool:
        max-active: 8   #最大连接数据库连接数,设 0 为没有限制
        max-idle: 8     #最大等待连接中的数量,设 0 为没有限制
        max-wait: -1ms  #最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。
        min-idle: 0     #最小等待连接中的数量,设 0 为没有限制
      shutdown-timeout: 100ms
    password: 123456
    port: 6379

4.使用

@Resource
private RedissonClient redissonClient;

// 创建锁对象
RLock redisLock = redissonClient.getLock("lock:xxxxx");
// 尝试获取锁
boolean isLock = redisLock.tryLock();
// 判断
if (!isLock) {
    // 获取锁失败,直接返回失败
    throw new CommonException(-1, "监测到文件" + originalFilename + ",正在导入请稍后在试!");
}
@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
    // 获取锁(可重入),指定锁的名称
    RLock lock = redissonClient.getLock("anyLock"); 
    // 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
    // 判断释放获取成功
    if(isLock){
        try {
            System.out.println("执行业务");
        }finally {
            // 释放锁
            lock.unlock();
        }
    }
}

5.api方法简介

 //1. 普通的可重入锁
    RLock lock = redissonClient.getLock("generalLock");

    // 拿锁失败时会不停的重试
    // 具有Watch Dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s
    lock.lock();

    // 尝试拿锁10s后停止重试,返回false
    // 具有Watch Dog 自动延期机制 默认续30s
    boolean res1 = lock.tryLock(10, TimeUnit.SECONDS);

    // 拿锁失败时会不停的重试
    // 没有Watch Dog ,10s后自动释放
    lock.lock(10, TimeUnit.SECONDS);

    // 尝试拿锁100s后停止重试,返回false
    // 没有Watch Dog ,10s后自动释放
    boolean res2 = lock.tryLock(100, 10, TimeUnit.SECONDS);

    //2. 公平锁 保证 Redisson 客户端线程将以其请求的顺序获得锁
    RLock fairLock = redissonClient.getFairLock("fairLock");

6.redisson原理

首先分布式锁要考虑
分布式锁需要考虑
互斥性 setnx
防死锁(过期,锁续命)
可重复性
高性能
而Redisson满足 实现了 锁续命 锁错删 可重入
[图片]

"if (redis.call('exists', KEYS[1]) == 0) then " +
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
        "end; " +
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
        "end; " +
        "return redis.call('pttl', KEYS[1]);",

客户端线程在底层是如何实现加锁的:
第一步,redisson.getLock(“mylock”);可以获得一个 Redisson 的分布式锁对象,可以使用该对象进行加锁、释放锁、判断锁状态等操作。
第二步加锁,加锁的底层逻辑是通过lua脚本实现的,如果客户端线程第一次去加锁的话,
执行 ‘hincrby’ 会在key对应的hash数据结构当中,添加线程标识。HashKey 是UUID:TreadID value

  1. 来指定该线程当前对这个key已经加锁一次了,并且设置锁的过期时间为30秒
  2. 客户端线程是如何维持加锁的,当加锁成功后,此时会对加锁的结果设置一个监听器,如果监听到加锁成功了,也就是返回的结果时null,此时就会在后台通过watchdog 看门狗机制,
    启动一个后台的定时任务,每隔10秒执行一次,检查当前key依旧存在,就会重置key的存活时间为30秒,维持加锁,底层就是通过后台这样一个,线程定时刷新存活时间维系的(renewExpiration自己掉自己)
  3. 相同的客户端线程是如何实现可重入加锁的
    第一次加锁时 会往key对应的hash数据结构中 设置 UUID:ThreadID 1 表示当前线程对key 加锁一次
    如果相同线程再来对这个key加锁,只需要将UUID:ThreadID 持有锁的次数加1 即可 就为
    UUID:ThreadID 2 了,redisson底层就是通过这样的数据结构 来表示重入锁的
  4. 其他线程加锁失败时,底层是如何实现阻塞的
    通过key对应的hash结构当中的UUID:ThreadID 判断是否为当前线程id 如果不是则线程加锁失败
    如果没有获取锁的超时时间 此时就会进入一个 while 的死循环中 一直尝试加锁 直到加锁成功才会返回
  5. 客户端宕机了 锁如何释放的
    客户端宕机后 相应的watchdog 后台定时任务 当然已经没了 此时就无法对key 进行定时续期 那么当指定存活时间过后 key就会自动失效 锁就当然自动释放了
  6. 客户端如何主动释放持有的锁
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
        "return nil;" +
        "end; " +
        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
        "if (counter > 0) then " +
        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
        "return 0; " +
        "else " +
        "redis.call('del', KEYS[1]); " +
        "redis.call('publish', KEYS[2], ARGV[1]); " +
        "return 1; " +
        "end; " +
        "return nil;",

客户端主动释放底层,也是通过执行lua脚本的方式来实现的
如果判断当前释放的key存在,并且在key的hash结构当中,存在当前线程的加锁信息,那么此时就会减扣当前线程,对这个key的重入锁次数,减扣线程的重入锁次数之后,如果当前线程在这个key的重入次数为0,此时就会直接释放锁,如果当前线程,在这个key中重入锁次数依然大于0。此时就直接重置一下,key的续期时间为30秒
7. 客户端尝试获取锁超时时间的机制底层是如何实现的

boolean isLock2 = redisLock.tryLock(1,TimeUnit.MINUTES);

如果在加锁时就指定 尝试获取锁的超时时间 如果获取锁失败 此时就不会永无止境的在while循环里面一直等待 ,而是根据你指定的锁超时时间,在这段时间范围获取不到锁,那么就会标记为获取锁失败,直接返回false
8. 客户端锁超时自动释放锁机制底层是怎么实现的

redisLock.lock(1, TimeUnit.MINUTES);

如果在加锁的时候指定了锁的超时时间,那么就算你获取锁成功了,也不会开启watch dog的定时任务,此时就将当前持有的这把锁的过期时间,设置为你指定的超时时间,那么你指定的时间到了之后,key失效被删除了,key对应的锁相应的也就自动释放了

举报

相关推荐

0 条评论