1.简介
(1).作用
Hystrix是SpringCloud集成的容错组件,主要用于避免级联故障,提高系统可用性。
Hystrix提供的功能主要包括,服务熔断、服务限流、服务降级和服务监控。
(2).服务熔断
当服务A调用的服务B不可用时,服务A为了保证自己业务不受影响,从而不再调用服务B,直接返回一个结果,直到服务B可用。
(3).服务限流
在高并发请求下,为了保护系统,可以对访问服务的请求进行数量上的限制。
(4).服务降级
当服务器压力剧增的情况下,根据实际业务情况及流量,对一些服务有策略地不处理或换种简单的方式处理,从而释放服务器资源以保证核心业务正常运作。
2.架构图
(1).工作流程
- 调用入口方法
- 调用toObservable()
- 判断是否有缓存
- 熔断是否开启
- 限流是否触发
- 业务执行有没有失败
- 业务执行有没有超时
(2).引入依赖
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.18</version>
</dependency>
3.HystrixCommand
(1).CommandDemo
会以隔离的形式完成run方法调用,默认线程池隔离。
public class CommandDemo extends HystrixCommand<String> {
private String name;
public CommandDemo(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CommandDemo")));
this.name = name;
}
@Override
protected String run() {
String result = "CommandDemo name:" + name;
System.err.println(result + ",currentThread:" + Thread.currentThread().getName());
return result;
}
}
(2).execute()
public class CommandTest {
@Test
public void executeTest() {
long beginTime = System.currentTimeMillis();
CommandDemo commandDemo = new CommandDemo("execute");
//同步执行Command
String result = commandDemo.execute();
long endTime = System.currentTimeMillis();
System.out.println(result + ",cost time:" + (endTime - beginTime) + "ms");
}
}
(3).queue()
public class CommandTest {
@Test
public void queueTest() throws ExecutionException, InterruptedException {
long beginTime = System.currentTimeMillis();
CommandDemo commandDemo = new CommandDemo("queue");
//异步执行Command
Future<String> queue = commandDemo.queue();
long endTime = System.currentTimeMillis();
System.out.println("future end,cost time:" + (endTime - beginTime) + "ms");
long endTime2 = System.currentTimeMillis();
System.out.println(queue.get() + ",cost time:" + (endTime2 - beginTime) + "ms");
}
}
(4).observe()
public class CommandTest {
@Test
public void observeTest() {
long beginTime = System.currentTimeMillis();
CommandDemo commandDemo = new CommandDemo("observe");
Observable<String> observe = commandDemo.observe();
//阻塞式调用
String result = observe.toBlocking().single();
long endTime = System.currentTimeMillis();
System.out.println(result + "cost time:" + (endTime - beginTime) + "ms");
}
}
public class CommandTest {
@Test
public void observeTest() {
long beginTime = System.currentTimeMillis();
CommandDemo commandDemo = new CommandDemo("observe");
Observable<String> observe = commandDemo.observe();
//阻塞式调用
String result = observe.toBlocking().single();
long endTime = System.currentTimeMillis();
System.out.println(result + "cost time:" + (endTime - beginTime) + "ms");
//非阻塞式调用
observe.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.err.println("observe,onCompleted");
}
@Override
public void onError(Throwable throwable) {
System.err.println("observe,onError-throwable:" + throwable);
}
@Override
public void onNext(String result) {
long endTime = System.currentTimeMillis();
System.err.println(result + "cost time:" + (endTime - beginTime) + "ms");
}
});
}
}
(5).observe()
public class CommandTest {
@Test
public void toObserveTest() throws InterruptedException {
long beginTime = System.currentTimeMillis();
CommandDemo commandDemo1 = new CommandDemo("toObservable1");
Observable<String> toObservable1 = commandDemo1.toObservable();
//阻塞式调用
String result = toObservable1.toBlocking().single();
long endTime = System.currentTimeMillis();
System.out.println(result + "cost time:" + (endTime - beginTime) + "ms");
}
}
public class CommandTest {
@Test
public void toObserveTest() throws InterruptedException {
long beginTime = System.currentTimeMillis();
CommandDemo commandDemo1 = new CommandDemo("toObservable1");
Observable<String> toObservable1 = commandDemo1.toObservable();
//阻塞式调用
String result = toObservable1.toBlocking().single();
long endTime = System.currentTimeMillis();
System.out.println(result + "cost time:" + (endTime - beginTime) + "ms");
CommandDemo commandDemo2 = new CommandDemo("toObservable2");
Observable<String> toObservable2 = commandDemo2.toObservable();
//非阻塞式调用
toObservable2.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.err.println("toObservable,onCompleted");
}
@Override
public void onError(Throwable throwable) {
System.err.println("toObservable,onError - throwable=" + throwable);
}
@Override
public void onNext(String result) {
long endTime = System.currentTimeMillis();
System.err.println(result + "cost time:" + (endTime - beginTime) + "ms");
}
});
Thread.sleep(2000l);
}
}
(6).四个方法的区别
- execute:单次处理,同步执行
- queue:单次处理,异步执行
- observe:多次处理,Hot处理
- 执行Command的run方法
- 加载注册Subscriber对象
- 将run方法结果注入到Subscriber对象的onNext方法
- toObserve:多次处理,Cold处理,每次订阅都需要一个新的Command对象
- 加载注册Subscriber对象
- 执行Command的run方法
- 将run方法结果注入到Subscriber对象的onNext方法
4.ObserveableCommand
(1).ObserveableCommandDemo
使用当前线程进行调用,默认信号量隔离,也可以通过设置使用不同线程执行。
public class ObservableCommandDemo extends HystrixObservableCommand<String> {
private String name;
public ObservableCommandDemo(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ObservableCommandDemo"))
.andCommandKey(HystrixCommandKey.Factory.asKey("ObservableCommandDemo")));
this.name = name;
}
@Override
protected Observable<String> construct() {
System.err.println("currentThread:" + Thread.currentThread().getName());
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//业务处理
subscriber.onNext("action 1,name:" + name);
subscriber.onNext("action 2,name:" + name);
subscriber.onNext("action 3,name:" + name);
//业务处理结束
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
}
(2).observe()
public class ObservableCommandTest {
@Test
public void observeTest() throws InterruptedException {
long beginTime = System.currentTimeMillis();
ObservableCommandDemo observableCommandDemo = new ObservableCommandDemo("ObservableCommandDemo-observe");
Observable<String> observe = observableCommandDemo.observe();
//非阻塞式调用
observe.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.err.println("ObservableCommandDemo-observe,onCompleted");
}
@Override
public void onError(Throwable throwable) {
System.err.println("ObservableCommandDemo-observe,onError-throwable=" + throwable);
}
@Override
public void onNext(String result) {
long endTime = System.currentTimeMillis();
System.err.println(result + "cost time:" + (endTime - beginTime) + "ms");
}
});
Thread.sleep(1000l);
}
}
5.请求缓存
(1).简介
- Hystrix支持将请求结果进行本地缓存
- 请求缓存要求必须在同一个上下文
- 通过实现getCacheKey方法来判断是否取出缓存
- 可以通过RequestCacheEnabled开启请求缓存
(2).代码示例
public class CommandTest {
@Test
public void requestCache() {
//开启请求上下文
HystrixRequestContext requestContext = HystrixRequestContext.initializeContext();
long beginTime = System.currentTimeMillis();
CommandDemo c1 = new CommandDemo("c1");
CommandDemo c2 = new CommandDemo("c2");
CommandDemo c3 = new CommandDemo("c3");
//第一次请求
String r1 = c1.execute();
System.out.println(r1 + "cost time:" + (System.currentTimeMillis() - beginTime) + "ms");
//第二次请求
String r2 = c2.execute();
System.out.println(r2 + "cost time:" + (System.currentTimeMillis() - beginTime) + "ms");
//第三次请求
String r3 = c3.execute();
System.out.println(r3 + "cost time:" + (System.currentTimeMillis() - beginTime) + "ms");
//请求上下文关闭
requestContext.close();
}
}
熔断
熔断器是一个开关,用来控制流量是否执行业务逻辑。
熔断器核心指标
指标 | 含义 |
请求时间窗 | 一个时间段 |
请求总数阈值 | 核心时间窗内有多少请求总数 |
错误百分比阈值 | 当达到请求总数阈值,同时错误达到一定比例,触发熔断开关 |
熔断器状态
状态 | 含义 |
开启 | 所有请求都会进入fallback方法 |
半开启 | 间歇性让请求触发run方法,成功则关闭熔断器 |
关闭 | 正常处理业务请求 |
默认情况下,熔断器开启5秒后进入半开启状态,熔断器的计算是有耗时的,等待熔断健康检查。熔断器是可以强制手段开启或关闭。