说明
- 本文将描述 Apache ShenYu 网关 hystrix、sentinel、resilience4j插件使用,以及 源码分析
hystrix 插件
hystrix 插件使用
插件概述
- shenyu 网关 hystrix插件是用来对网关流量进行熔断的核心实现。
- 它使用信号量的方式来处理请求
插件使用
- 在 shenyu-admin –> 插件管理 –> hystrix,设置为开启。
- 如果用户不使用,则在 shenyu-admin 后台把此插件停用。因为 shenyu 网关的插件是基于责任链模式的,若是开启了多个熔断插件,且请求路径匹配到一致,则要进行多重熔断过滤,影响性能。
- 在 shenyu-bootstarp 里添加 hystrix 相关依赖,并重启改服务
<!-- shenyu hystrix plugin start--> <dependency> <groupId>org.apache.shenyu</groupId> <artifactId>shenyu-spring-boot-starter-plugin-hystrix</artifactId> <version>${last.version}</version> </dependency> <!-- shenyu hystrix plugin end-->
- 选择器请参考Apache ShenYu 网关选择器和规则解析
- 规则参数说明:
- 跳闸最小请求数量 :最小的请求量,至少要达到这个量才会触发熔断
- 错误半分比阀值 : 这段时间内,发生异常的百分比。
- 最大并发量 : 最大的并发量
- 跳闸休眠时间(ms) :熔断以后恢复的时间。
- 分组Key: 一般设置为:contextPath
- 命令Key: 一般设置为具体的 路径接口。
- 失败降级URL: 请求被熔断后请求的url
- 当选择器和规则配置好了后,我们并发请求网关,匹配到插件后就会做相应的熔断处理
压测数据
跳闸最小请求数量设置为10,隔离模式设置为thread, 也就是说并发超过10的都抛弃掉了
wrk -t12 -c400 -d10s http://localhost:9195/http/order/findById\?id\=10
Running 10s test @ http://localhost:9195/http/order/findById?id=10
12 threads and 400 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 19.18ms 17.40ms 161.28ms 81.18%
Req/Sec 1.09k 479.15 2.60k 60.83%
130096 requests in 10.03s, 20.66MB read
Socket errors: connect 158, read 99, write 0, timeout 0
Non-2xx or 3xx responses: 99957
Requests/sec: 12975.89
Transfer/sec: 2.06MB
hystrix 插件源码分析
hystrix插件源码在 shenyu-plugin-hystrix 模块,它的核心原理是:当请求匹配到 hystrix插件后,判断格式模式。当隔离模式为线程池模式时,会创建一个 HystrixCommandOnThread类,它继承自 hystrix 的 HystrixCommand 类,它的底层就是队列形式的线程池去执行;当格式模式为信号量模式时,会创建继承自HystrixObservableCommand的处理类,它底层则实现了信号量的机制。核心源码如下:
public class HystrixPlugin extends AbstractShenyuPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
……
// 解析HytrixHandle
final HystrixHandle hystrixHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), HystrixHandle.class);
……
//根据隔离模式创建 hystrix对应的熔断处理类,有线程池模式和信号量模式
Command command = fetchCommand(hystrixHandle, exchange, chain);
return Mono.create(s -> {
// 执行 hystrix 熔断相关逻辑
Subscription sub = command.fetchObservable().subscribe(s::success,
s::error, s::success);
s.onCancel(sub::unsubscribe);
//如果断路器打开,打印相关信息
if (command.isCircuitBreakerOpen()) {
log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey());
}
}).doOnError(throwable -> {
//hystrix 执行异常
……
}).then();
}
//根据隔离模式创建 hystrix对应的熔断处理类,有线程池模式和信号量模式
private Command fetchCommand(final HystrixHandle hystrixHandle, final ServerWebExchange exchange, final ShenyuPluginChain chain) {
if (hystrixHandle.getExecutionIsolationStrategy() == HystrixIsolationModeEnum.SEMAPHORE.getCode()) {
// 信号量隔离模式
return new HystrixCommand(HystrixBuilder.build(hystrixHandle),
exchange, chain, hystrixHandle.getCallBackUri());
}
// 线程池模式
return new HystrixCommandOnThread(HystrixBuilder.buildForHystrixCommand(hystrixHandle),
exchange, chain, hystrixHandle.getCallBackUri());
}
}
sentinel 插件
sentinel 插件使用
插件概述
sentinel插件是 shenyu 网关对流量熔断的另一支持插件。
插件使用
- 在 shenyu-admi -> 插件管理 -> sentinel -> 编辑 -> 开启
- 在 shenyu-bootstrap 中加入 sentinel插件的依赖,并重启 shenyu-bootstrap项目
<!-- shenyu sentinel plugin start--> <dependency> <groupId>org.apache.shenyu</groupId> <artifactId>shenyu-spring-boot-starter-plugin-sentinel</artifactId> <version>${last.version}</version> </dependency> <!-- shenyu sentinel plugin end-->
参数说明
- 选择器请参考Apache ShenYu 网关选择器和规则解析
- 规则参数说明:
- whether to open the degrade (1 or 0) :是否开启sentinel的流控(1开启,0关闭)。
- 熔断类型(degrade type):
- slow call ratio : 慢启动比例
- exception ratio : 异常比例
- exception number strategy : 异常数量
- degrade count : 熔断的数量(达到改并发以上就降级?)
- degrade window size: 熔断的时间窗口
- grade count : 流过记数
- whether control behavior is enabled (1 or 0) :限流规则启动
- 限流阈值类型 (grade type):
- QPS : req/sec = 请求数/秒
- number of concurrent threads: 并发线程数
- 流控效果(control behavior):
- direct rejection by default: 默认直接拒绝
- warm up: 预热
- constant speed queuing:恒速排队
- preheating uniformly queued: 预热恒速排队
压测数据
wrk -t12 -c400 -d10s http://localhost:9195/http/order/findById\?id\=10
Running 10s test @ http://localhost:9195/http/order/findById?id=10
12 threads and 400 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 71.57ms 118.08ms 624.53ms 83.81%
Req/Sec 692.21 409.15 2.24k 66.61%
82006 requests in 10.07s, 13.45MB read
Socket errors: connect 158, read 137, write 0, timeout 14
Non-2xx or 3xx responses: 82006
Requests/sec: 8145.77
Transfer/sec: 1.34MB
sentinel 插件源码解析
sentinel 插件熔断限流的核心原理是在 shenyu 的插件链中的 SentinelPlugin里,通过集成 alibaba sentinel相关组件来实现。核心源码如下:
public class SentinelPlugin extends AbstractShenyuPlugin {
……
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
……
//从规则对象中获取 sentinelHandle相关数据
SentinelHandle sentinelHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), SentinelHandle.class);
//SentinelReactorTransformer 就是 alibaba sentinel 的核心类,通过它实现限流熔断
return chain.execute(exchange).transform(new SentinelReactorTransformer<>(resourceName)).doOnSuccess(v -> {
……
}).onErrorResume(throwable -> sentinelFallbackHandler.fallback(exchange, UriUtils.createUri(sentinelHandle.getFallbackUri()), throwable));
}
}
resilience4j 插件
resilience4j 插件使用
resilience4j 插件概述
resilience4j 也是一款熔断限流框架,shenyu 网关以插件的方式将其接入网关中,为 shenyu 网关提供熔断限流功能。
插件使用
- shenyu-admin -> 插件管理 -> resilience4j -> 编辑 -> 打开
- 在 shenyu-bootstrap项目中引入 resilience4j 依赖,并重启该项目
<!-- shenyu resilience4j plugin start--> <dependency> <groupId>org.apache.shenyu</groupId> <artifactId>shenyu-spring-boot-starter-plugin-resilience4j</artifactId> <version>${last.version}</version> </dependency> <!-- shenyu resilience4j plugin end-->
参数说明
- 选择器请参考Apache ShenYu 网关选择器和规则解析
- 规则参数说明:
- token filling period (ms): 刷新令牌的时间间隔,单位ms,默认值:500。
- token filling number: 每次刷新令牌的数量
- control behavior timeout (ms):等待获取令牌的超时时间,单位ms,默认值:5000。
- circuit enable: 是否开启熔断,0:关闭,1:开启,默认值:0。
- circuit timeout (ms): 熔断超时时间,单位ms,默认值:30000。
- fallback uri:降级处理的uri。
- sliding window size:滑动窗口大小,默认值:100。
- sliding window type: 滑动窗口类型,0:基于计数,1:基于时间,默认值:0。
- enabled error minimum calculation threshold:开启熔断的最小请求数,超过这个请求数才开启熔断统计,默认值:100。
- degrade opening duration(ms):断器开启持续时间,单位ms,默认值:10。
- half open threshold: 半开状态下的环形缓冲区大小,必须达到此数量才会计算失败率,默认值:10。
- degrade failure rate: 错误率百分比,达到这个阈值,熔断器才会开启,默认值50。
压测数据
吞吐量挺高的原因是,触发的熔断,很多请求直接拒绝了
wrk -t12 -c400 -d10s http://localhost:9195/http/order/findById\?id\=10
Running 10s test @ http://localhost:9195/http/order/findById?id=10
12 threads and 400 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 115.84ms 244.46ms 1.12s 86.53%
Req/Sec 1.25k 773.87 3.31k 64.29%
126420 requests in 10.08s, 20.71MB read
Socket errors: connect 158, read 136, write 0, timeout 0
Non-2xx or 3xx responses: 126420
Requests/sec: 12544.21
Transfer/sec: 2.06MB
io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker '1355060043714682880_/http/order' is OPEN and does not permit further calls
at io.github.resilience4j.circuitbreaker.CallNotPermittedException.createCallNotPermittedException(CallNotPermittedException.java:48) ~[resilience4j-circuitbreaker-1.6.1.jar:1.6.1]
……
resilience4j 插件源码分析
shenyu 网关以插件的形式将 resilience4j相关组件集成了进去,为 shenyu 网关提供 resilience4j熔断的功能。resilience4j插件核心类是CombinedExecutor类,在该类中创建了resilience4j.CircuitBreaker对象,以此为 shenyu resilience4j插件赋能。
public class CombinedExecutor implements Executor {
@Override
public <T> Mono<T> run(final Mono<T> run, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf resilience4JConf) {
//获取 resilience4j 限流对象,采用令牌桶算法,恒定速度生成令牌,放到令牌桶,消费端去取令牌。当令牌桶内无令牌,则拒绝
RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(resilience4JConf.getId(), resilience4JConf.getRateLimiterConfig());
//获取 resilience4j 断路器对象
CircuitBreaker circuitBreaker = Resilience4JRegistryFactory.circuitBreaker(resilience4JConf.getId(), resilience4JConf.getCircuitBreakerConfig());
//根据限流和断路器对象的功能逻辑去执行
Mono<T> to = run.transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
.transformDeferred(RateLimiterOperator.of(rateLimiter))
.timeout(resilience4JConf.getTimeLimiterConfig().getTimeoutDuration())
.doOnError(TimeoutException.class, t -> circuitBreaker.onError(
resilience4JConf.getTimeLimiterConfig().getTimeoutDuration().toMillis(),
TimeUnit.MILLISECONDS,
t));
……
}
}