# NotOnlyRedisLock **Repository Path**: cilian/NotOnlyRedisLock ## Basic Information - **Project Name**: NotOnlyRedisLock - **Description**: NotOnlyRedisLock一个应用于springboot项目的,基于redis的分布式锁。框架内部实现了看门狗,重入锁,注解加减锁,简单易用。并且对于没有拿到锁的线程提供了三种方法的管理解决方案。使用者无需手动解锁,只需要在被加锁的方法上面添加方法,即可开启所有功能。 - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: plus - **Homepage**: https://gitee.com/S_Yang/NotOnlyRedisLock - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 4 - **Created**: 2022-04-21 - **Last Updated**: 2022-04-21 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # NotOnlyRedisLock ## 介绍 ​ NotOnlyRedisLock一个应用于springboot项目的,基于redis的在分布式情况下的一系列并发工具实现。包括但不限于**可用于多节点项目防重复业务调用**的分布式锁、**一定数量的线程执行完毕之后被await的线程会在阻塞的情况下被唤醒**的CountDownLatch、**等待一定数量线程达到以后才可以运行执行**的CyclicBarrier以及**可以用于限流操作**的Semaphore等一系列分布式高并发的工具包。 ​ 此外项目在redis分布式上对于大部分场景可能会出现的问题都有相应的解决方案。 ​ 1、redis分布式锁的最小颗粒度为单个方法,在一个项目内可以对**多个方法同时采用该框架实现分布式锁的情况下,互不影响。** ​ 2、redis分布锁的通过注解开启,简单、易用。**使用者无需关心加锁、解锁等问题**,在添加注解之后,框架内部自动处理。 ​ 3、可以通过简单的注解配置达到同机器或不同机器竞争锁。并且对于**没有拿到锁的线程提供了三种管理办法**,可以根据业务的不同选择效率最高的管理办法。 ​ 4、框架内提供了自动创建看门狗机制。由于redis分布锁无法自动续期,所以**在保证线程存活的情况下会给redis分布锁自动续期**,在获得锁的线程处理完业务逻辑以后看门狗自动消失。 ​ 5、框架内实现可重入锁。在已经获得redis分布锁的**线程内部重新需要获取锁的线程无需再次争抢,直接获取锁**,解决了递归情况下的死锁问题。 ## RedisLock 分布式锁 ### Redis分布锁使用步骤 1、SpringBootApplication上添加@EnableLock注解引入锁。 2、在需要开启分布式锁的方法上添加@Lock注解。 ```java @EnableLock public class XXServiceApplication { public static void main(String[] args) { SpringApplication.run(XXServiceApplication.class, args); } } ``` ```java @Lock(policy = LockPolicy.SpinLock) public void hello() { System.out.println("Hello,World"); } ``` #### @Lock参数介绍 | 名称 | 类型 | 默认值 | 备注 | | ------------- | ---------------- | ----------- | ------------------------------------------------------------------------ | | policy | LockPolicy | Lock | 锁策略 默认为SpanLock:利用自旋的方式持续获得锁,直到获取成功 | | lockTimes | Int | 10 | 一个线程持有锁的时间 | | keyPre | String | “” | redis分布式的key值前缀 | | timeOut | long | 10 | 自旋锁进入队列的最大自旋次数 | #### 示例代码 ##### 直接使用 ```java @Lock public boolean service() { //业务代码 } ``` ##### 自旋锁,超过10次后进入队列后慢自旋 ```java @Lock(policy = LockPolicy.SpinLock, timetOut = 10) public void hello() { //业务代码 } ``` ##### 阻塞锁 获取锁失败后阻塞线程,等待唤醒线程唤醒 ```java @Lock(policy = LockPolicy.BlockLock) public void hello() { //业务代码 } ``` **混合锁 综合自旋锁和阻塞锁的优点,自旋次数到达阈值以后进行阻塞等待唤醒** ```java @Lock(policy = LockPolicy.Fix) public void hello() { //业务代码 } ``` ### 重入锁 ​ 在已经获得redis分布锁的线程内部重新需要获取锁的线程无需再次争抢,直接获取到。 ​ 值得注意的是,在该框架下面需要调用方法时候需要使用框架内**Aops.getSelf()**方法去获取方法,否则被注解修饰的方法无法获取锁。 #### Aops.getSelf()使用实例 ```java @Lock(policy = LockPolicy.Fix) public void hello(int times) { if (times<1) return; //Aops.getSelf()方法参数必须是this Aops.getSelf(this).hello(--times); } ``` #### 示例代码 ```java @GetMapping("/testReentrantLock/{id}") @Lock(policy = LockPolicy.SpinLock) @ResponseBody public String testReentrantLock(@PathVariable int id) { final long i = id; log.info(Thread.currentThread().getId() + "处理业务逻辑" + i); if (i <= 1) { return ""; } else { Aops.getSelf(this).testReentrantLock(--id); } return "hello"; } ``` #### 示例结果 ![输入图片说明](https://images.gitee.com/uploads/images/2021/1120/203821_1bbe01b2_7405426.jpeg "DA8300F26117915D566103B63450B4A5.jpg") ### 看门狗机制 ​ 由于redis分布锁为了解决获取锁的线程突然宕机没有及时删除锁key的问题,采用了加锁时候给锁key添加过期时间的办法来解决。但是redis锁无法自动续期,即一个线程在默认的过期时间没有完成自己的业务逻辑处理就会被redis自动删除锁,造成问题。所以在保证线程存活的情况下会给redis分布锁自动续期,在获得锁的线程处理完业务逻辑以后看门狗自动消失。 ​ 由于看门狗是在线程获取锁之后自动添加,所以使用者不需要特意开启。下面是测试看门狗的说明及示例代码和示例结果。 ​ redis分布锁的默认过期时间是10s,所以该测试方法睡眠15s来模拟业务处理时间超过了redis分布式锁的过期时间。当过了redis锁过期时间的1/3的时候会进行一次续命操作。当线程业务逻辑处理完毕会自动取消看门狗,这里称之为“杀狗”。如果看门狗续命了很多次线程的业务逻辑还没有处理完,为了不影响后面的线程处理会结束掉持有锁的线程。 下面展示正常续命情况 #### 示例代码 ```java @GetMapping("/testWatchDog") @Lock(policy = LockPolicy.BlockLock) @ResponseBody public String testWatchDog() throws InterruptedException { TimeUnit.SECONDS.sleep(15); log.info(Thread.currentThread().getId() + "线程业务逻辑处理完毕"); return "hello"; } ``` #### 示例结果 ![输入图片说明](https://images.gitee.com/uploads/images/2021/1120/203831_4c2c46fa_7405426.jpeg "正常续命.jpg") 下面展示业务逻辑超时还没有释放锁的情况 #### 示例代码 ```java @GetMapping("/testWatchDog") @Lock(policy = LockPolicy.SpinLock) @ResponseBody public String testWatchDog() throws InterruptedException { TimeUnit.SECONDS.sleep(100); log.info(Thread.currentThread().getId() + "线程业务逻辑处理完毕"); return "hello"; } ``` #### 示例结果 ![输入图片说明](https://images.gitee.com/uploads/images/2021/1120/203841_ef0bd1c1_7405426.jpeg "超时续命.jpg") ### 没有拿到锁的线程管理 #### 自旋锁队列 ​ 自旋锁分为两个等级,当一个线程获取锁失败以后,会间隔短暂的时间去获取锁,如果在这个阶段自旋的次数到达了短间隔休眠获取锁的阈值,就会进入队列,减少获取锁的次数,减轻redis服务器的负担。进入队列的格式是当前机器的MAC地址+JVMpid号码+线程的id。这个线程间隔一个长时间去队列查看即将出队的是不是自己,如果是自己就出队,去抢占锁,没有就继续休眠。 #### 阻塞锁队列 ​ 阻塞锁在一个线程没有获取到锁的情况下直接将自己的信息写入队列然后阻塞,同时框架内部会开启一个唤醒线程,他会看当前锁的竞争情况去队列唤醒一部分线程去争抢锁,被唤醒的线程拥有更高优先级的抢占机会。他会不间隔时间的抢占锁提升获取到锁的概率。所以为了减轻redis服务器的压力,需要严格控制被唤醒线程的数量,在当前框架下,经笔者测试当唤醒6个线程的效率最高。 #### 混合锁队列 ​ 混合锁是自选锁和阻塞锁的结合体,当一个线程自旋次数到达阈值也没有成功的获取到锁,才会被阻塞,等待唤醒线程去唤醒他。因为阻塞是需要从内核态转到用户态,是一个比较消耗资源的过程。大量的自旋又会造成redis压力特被大,所以这种方式虽然具有很大优势但是对于参数要求比较严格。 ## Semaphore 信号量 ​ 在Java当中的信号量概念是如果线程要访问一个资源就必须先获得信号量。如果信号量内部计数器大于0,信号量减1,然后允许共享这个资源;否则,如果信号量的计数器等于0,信号量将会把线程置入休眠直至计数器大于0.当信号量使用完时,必须释放。 ​ 同样如果在分布式的场景下,这个工具类就无法使用,因为无法对多个部署机器的线程进行管理。 ​ 该框架提供了简单的Semaphore的实现方式,给予使用者创建对象和添加注解两种解决办法。 ### 使用介绍 1、同redis分布锁一样,使用者仍然需要在SpringBootApplication上添加@EnableLock注解引入锁。 ```java @EnableLock public class XXServiceApplication { public static void main(String[] args) { SpringApplication.run(XXServiceApplication.class, args); } } ``` 2、在要进行加信号量的方法上面添加@Semaphore注解 #### @Semaphore参数介绍 | 名称 | 类型 | 默认值 | 备注 | | -------------- | ------- | ------ | :---------------------------- | | SemphoreCounts | Int | 10 | Semaphore信号量的默认许可数量 | | isUseDefault | boolean | True | 是否采用系统默认的解决办法 | #### 注解使用示例代码 ```java @Semaphore() public void hello() { System.out.println("Hello,world!"); } ``` ​ 值得注意的是,直接在方法上面添加注解,默认采用框架内提供的解决办法,即信号量的大小是10,没有获得信号量的线程持续阻塞直到获得信号量执行逻辑。 ​ 3、如果需要根据自己的业务不采用默认的解决办法,需要在注解的isUseDeafult选型添加false,然后在代码里面创建semaphore对象。 #### SemaphoreFactory的getSemaphore()方法参数介绍 | 参数 | 类型 | 备注 | | --------------- | ------------- | ------------------------- | | semaphoreCounts | Integer | Semaphore的许可数量。 | | RedisTemplate | RedisTemplate | springboot下的redis客户端 | #### 使用示例代码 ```java DistributedSemaphore semaphore = SemaphoreFactory.getSemaphore(10, redisTemplate); ``` ​ 示例代码则创建一个10个许可的信号量对象。 #### semaphore方法功能及参数介绍 | 名称 | 返回值类型 | 参数 | 备注 | | :----------------: | ---------- | ------------------------------ | ------------------------------------------------------------ | | acquire() | void | void | 等待获取信号量,如果没有获取到就会一直阻塞直到成功获取 | | acquire() | boolean | int timeout, TimeUnit timeUnit | 在规定的时间内获取许可,如果超过规定的时间仍然没有获得许可则返回false,反之返回true。 | | tryAcquire() | boolean | void | 只尝试一次获取锁,如果失败返回false,成功返回true。 | | release() | void | void | 业务处理完成执行该方法回归信号量 | | availablePermits() | int | void | 查看当前可用的信号量的许可数量 | #### 测试代码 ```java @Semaphore(SemaphoreCounts = 10, isUseDefault = false) public String testSem() { DistributedSemaphore semaphore = SemaphoreFactory.getSemaphore(10, redisTemplate); semaphore.acquire(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } log.info(Thread.currentThread().getId() + "处理业务逻辑"); semaphore.release(); return "hello"; } ``` ## CountDownLatch 闭锁 ​ countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。 #### 使用介绍 1、使用者需要通过CountdownLatchFactory创建DistributedCountDownLatch对象 #### CountDownLatchFactory的getCountDownLatch()方法参数介绍 | 参数 | 类型 | 备注 | | -------------------- | ------------- | -------------------------- | | countDownLatchCounts | Integer | countDownLatch的许可数量。 | | RedisTemplate | RedisTemplate | springboot下的redis客户端 | #### 示例代码 ```java DistributedCountDownLatch myCountDownLatch = CountDownLatchFactory.getCountDownLatch(10, redisTemplate); ``` 示例代码创建了许可为10的CountDownLatch 2、创建好对象以后,在需要等待一定数量线程运行之后的线程执行await方法,其他线程运行结束运行countdown方法。具体的api如下表 #### DistributedCountDownLatch方法功能及参数介绍 | 名称 | 返回值类型 | 参数 | 备注 | | :---------: | ---------- | ---- | ------------------------------------------------------------ | | await() | void | void | 等待一定数量的线程执行完毕之后,执行该方法的线程才会等待许可数量变为0后从阻塞的状态转换为运行状态 | | countDown() | Void | Void | 线程执行一次,许可数量会减1 | #### 测试代码 ```java public String testCountDownLatch() { DistributedCountDownLatch myCountDownLatch = CountDownLatchFactory.getCountDownLatch(10, redisTemplate); for (int i = 0; i < 10; i++) { new Thread(() -> { try { log.info(Thread.currentThread().getId() + "子线程即将睡眠"); TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } finally { myCountDownLatch.countDown(); } }).start(); } for (int i = 0; i < 5; i++) { new Thread(() -> { myCountDownLatch.await(); log.info(Thread.currentThread().getId() + "被唤醒。开始处理业务逻辑"); }).start(); } myCountDownLatch.await(); log.info("主线程被唤醒"); return "hello"; } ``` ## CyclicBarrier 循环栅栏 ​ CyclicBarrier的作用就是会让所有线程都等待完成后才会继续下一步行动。 ​ 举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。 ### 使用介绍 1、使用者需要通过CyclicBarrierFactory获取到DistributedCyclicBarrier #### CyclicBarrierFactory的getCyclicBarrier()方法参数介绍 | 参数 | 类型 | 备注 | | ------------------- | ------------- | ----------------------------- | | CyclicBarrierCounts | Integer | CyclicBarrier可以执行的阈值。 | | RedisTemplate | RedisTemplate | springboot下的redis客户端 | #### 示例代码 ```java DistributedCyclicBarrier myCyc = CyclicBarrierFactory.getCyclicBarrier(10, redisTemplate); ``` 创建一个执行阈值为10的DistributedCyclicBarrier 2、创建好对象以后在方法内执行await方法,该线程会查看当前到达的线程是多少,如果大于阈值就会唤醒阈值数量的阻塞线程执行下面逻辑,如果小于阈值则会阻塞等待后面线程到来唤醒。 #### DistributedCyclicBarrier方法功能及参数介绍 | 名称 | 返回值类型 | 参数 | 备注 | | :----------------: | ---------- | ------- | ------------------------------------------------------------ | | await() | void | void | 在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。 | | getNumberWaiting() | Integer | void | 返回当前在屏障处等待的参与者数目。 | | getParties() | Integer | void | 返回要求 barrier 的参与者数目。 | | isBroken() | Boolean | Void | 查询此屏障是否处于损坏状态。 | | setAlwaysBlock | void | Boolean | 通过这个方法可以更改是否持续的阻塞直至唤醒 | | reset() | void | Void | 将屏障重置为其初始状态。 将是否破损标志位broken置为false。所有阻塞的线程都会被打断。恢复初始状态 | #### 测试代码 ```java public String testCyclicBarrier() { DistributedCyclicBarrier myCyc = CyclicBarrierFactory.getCyclicBarrier(10, redisTemplate); for (int i = 0; i < 1; i++) { new Thread(() -> { myCyc.await(); }).start(); } return "hello"; } ``` ## MultiLock 联锁 待做.. ## ReadWriteLock 读写锁 字面意思,读锁和写锁,两种锁合二为一成为读写锁。为什么要用读写锁呢?因为分布锁锁是拍他所,即防止多线程修改同一条数据造成数据安全性的问题。如果假设业务场景是读多写少,那么使用拍他所会造成资源的浪费。读写锁的读锁是共享锁写锁是互斥锁(读读共享。读写互斥,写写互斥)适用于读多鞋少的场景。 ### 使用介绍 使用者需要通过ReadWriteLockFactory.getReadWriteLock()获取读写锁DistributedReadWriteLock #### ReadWriteLockFactory的getReadWriteLock()方法参数介绍 | 参数 | 类型 | 备注 | | ------------- | ------------- | ------------------------- | | RedisTemplate | RedisTemplate | springboot下的redis客户端 | 获取DistributedReadWriteLock后获得相应的读锁和写锁。 #### DistributedReadWriteLock方法功能及参数介绍 | 名称 | 返回值类型 | 参数 | 备注 | | :------------: | -------------------- | ---- | -------- | | getReadLock() | DistributedReadLock | void | 获取读锁 | | getWriteLock() | DistributedWriteLock | void | 获取写锁 | 然后可以通过读锁或者写锁的lock和unlock方法进行加锁和解锁。 ### 测试代码 ```java public String testReadWriteLock() { String key = "testReadWriteLock"; DistributedReadWriteLock readWriteLock = ReadWriteLockFactory.getReadWriteLock(redisTemplate); DistributedReadLock readLock = readWriteLock.getReadLock(); DistributedWriteLock writeLock = readWriteLock.getWriteLock(); new Thread(() -> { try { readLock.lock(); read(key); } catch (Exception e) { e.printStackTrace(); } finally { readLock.unlock(); } }).start(); new Thread(() -> { try { writeLock.lock(); write(key); } catch (Exception e) { e.printStackTrace(); } finally { writeLock.unlock(); } }).start(); return null; } private void read(String key) { log.info(Thread.currentThread().getId() + ":开始读数据"); String result = (String) redisTemplate.opsForValue().get(key); log.info(Thread.currentThread().getId() + ":读完成,值为:" + result); } private void write(String key) { log.info(Thread.currentThread().getId() + ":开始写数据"); Long result = (Long) redisTemplate.opsForValue().increment(key, 1); log.info(Thread.currentThread().getId() + ":写完成,值为:" + result); } ```