Resilience4J 熔断器
简介
Resilience4j 主要有以下功能:
- CircuitBreaker(熔断器)
- RateLimiter(限流)
- Retry(请求重试)
- 限时
- 缓存
- 信号量的隔离
常用熔断器比较
简单使用
熔断器
- pom.xml 添加依赖
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
- 创建测试用例测试短路器的功能
@Test
public void testCircuitBreaker() {
// 创建断路器配置
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
//故障率阈值百分比(百分之50),超过这个阈值,断路器就会打开
.failureRateThreshold(50)
//断路器保持打开的时间,在到达设置的时间之后,断路器会进入到 half open 状态
.waitDurationInOpenState(Duration.ofMillis(1000))
//当断路器处于half open 状态时,环形缓冲区的大小
.ringBufferSizeInHalfOpenState(2)//半打开次数
.ringBufferSizeInClosedState(2)//至少要调用两次
.build();
//按以上配置创建断路器实例
CircuitBreakerRegistry r1 = CircuitBreakerRegistry.of(config);
CircuitBreaker cb1 = r1.circuitBreaker("yuan11");//创建默认断路器
CircuitBreaker cb2 = r1.circuitBreaker("yuan22", config);// 创建上述配置方案的断路器
// 测试调用
CheckedFunction0<String> supplier = CircuitBreaker.decorateCheckedSupplier(cb1, () -> "hello resilience4j");
Try<String> result = Try.of(supplier)
.map(v -> v + " hello world");
System.out.println(result.isSuccess());
System.out.println(result.get());
}
- 运行测试用例 结果如下:
可以看到正常情况调用了短路请求正常 未触发。 - 手动触发短路器
@Test
public void testCircuitBreaker2() {
// 创建断路器配置
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
//故障率阈值百分比(百分之50),超过这个阈值,断路器就会打开
.failureRateThreshold(50)
//断路器保持打开的时间,在到达设置的时间之后,断路器会进入到 half open 状态
.waitDurationInOpenState(Duration.ofMillis(1000))
//当断路器处于half open 状态时,环形缓冲区的大小
.ringBufferSizeInHalfOpenState(2)//半打开次数
.ringBufferSizeInClosedState(2)//至少要调用两次
.ignoreExceptions(NullPointerException.class)// 忽略空指针异常
.build();
//按以上配置创建断路器实例
CircuitBreakerRegistry r1 = CircuitBreakerRegistry.of(config);
CircuitBreaker cb1 = r1.circuitBreaker("yuan11");
//断路器开始状态
System.out.println(cb1.getState());
//第一次调用 1s
cb1.onError(1, TimeUnit.SECONDS, new NullPointerException());
//获取断路器的第一次状态
System.out.println(cb1.getState());
// 第二次调用1m
cb1.onError(1,TimeUnit.SECONDS, new NullPointerException());
//获取断路器的第二次状态
System.out.println(cb1.getState());
}
- 运行结果如下:
- 运行结果如下:
- 重置断路器
cb1.reset();
System.out.println(cb1.getState());
RateLimiter 限流
- 执行测试方法
@Test
public void testRateLimiter (){
RateLimiterConfig build = RateLimiterConfig.custom()
// 阈值刷新的时间 1 秒
.limitRefreshPeriod(Duration.ofMillis(1000))
// 限制频次
.limitForPeriod(2)
// 限流之后的冷却时间 1秒
.timeoutDuration(Duration.ofMillis(1000))
.build();
RateLimiter limiter = RateLimiter.of("test", build);
CheckedRunnable runnable = RateLimiter.decorateCheckedRunnable(limiter, () -> {
System.out.println(new Date());
});
// 执行4次
Try.run(runnable)
.andThenTry(runnable)
.andThenTry(runnable)
.andThenTry(runnable)
.onFailure(t -> System.out.println(t.getMessage()));
}
- 运行结果 可以看到2次之后,间隔了一秒重新执行
请求重试 Retry
- 修改pom.xml 引入依赖
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
</dependency>
- 执行测试用例:
@Test
public void testRetry(){
RetryConfig config = RetryConfig.custom()
// 重试次数 【小于等于下面的count次数就抛出异常】
.maxAttempts(4)
// 重试间隔
.waitDuration(Duration.ofMillis(500))
// 重试异常
.retryExceptions(RuntimeException.class)
.build();
Retry retry = Retry.of("testRetry", config);
Retry.decorateRunnable(retry, new Runnable() {
int count = 0;
// 重试功能开启后 执行run方法 若抛出异常 会自动触发重试功能
@Override
public void run() {
if (count++ < 4){
System.out.println(count);
throw new RuntimeException();
}
}
}).run();
}
- 运行结果如下:抛出异常后执行了4次重试
限时 Timelimiter
- 执行测试用例
@Test
public void testTimelimiter(){
ExecutorService executorService = Executors.newSingleThreadExecutor();
TimeLimiterConfig config = TimeLimiterConfig.custom()
.timeoutDuration(Duration.ofMillis(600))// 超时时间
.cancelRunningFuture(true)//是否取消线程执行
.build();
TimeLimiter timeLimiter = TimeLimiter.of(config);
Supplier<Future<String>> futureSupplier = () -> {
return executorService.submit(new testRunable());
};
Callable<String> restrictedCall = TimeLimiter.decorateFutureSupplier(timeLimiter,futureSupplier);
Try.of(restrictedCall::call)
.onFailure(throwable -> System.out.println("We might have timed out or the circuit breaker has opened."));
}
class testRunable implements Callable {
@Override
public Object call() {
for (int i = 0; i < 4; i++) {
System.out.println(i);
throw new RuntimeException();
}
return null;
}
}
- 运行结果如下: 4次循环第一次抛出异常之后就不再执行取消了线程执行。
请求隔离bulkhead
- pom.xml 添加依赖
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
</dependency>
- 执行测试用例
@Test
public void testBulkHead(){
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(2)// 最大并发
.maxWaitDuration(Duration.ofSeconds(1))//获取信号量的最长排队等待时间
//.fairCallHandlingStrategyEnabled(true)// 公平锁策略,等待线程竞争
.writableStackTraceEnabled(true)//BulkheadFullException 发生时减少堆栈跟踪中的信息量
.build();
BulkheadRegistry registry = BulkheadRegistry.of(config);
Bulkhead bulkhead = registry.bulkhead("foo");
Runnable decoratedSupplier = Bulkhead.decorateRunnable(bulkhead,()->{
System.out.println("test");
bulkhead.acquirePermission();
});
for (int i=0; i<4; i++) {
CompletableFuture.runAsync(decoratedSupplier)
.thenAccept(flights -> System.out.println("Received results"));
}
}
- 执行结果如下:最大并发是2,则获取了两次权限之后,不并发了执行完毕了。??