说明
- 本文将分析 Apache ShenYu 网关 divide 插件底层原理、负载均衡、ip端口探活
divide插件底层原理分析
插件功能说明
divide插件是进行http正向代理,所有的http请求都由该插件进行负载均衡调用。具体的负载均衡策略在规则中指定。配置详解:
- 第一个框:hostName,一般填写 localhost,该字段暂时没使用。
- 第二个框:http协议,一般填写 http:// 或者 https:// ,不填写默认为:http://
- 第三个框:ip与端口,这里填写你真实服务的 ip + 端口。
插件源码分析
插件配置
- 首先在shenyu-admin的插件管理中开启divide插件
- shenyu-bootstrap中加上 divide插件的依赖
<!--if you use http proxy start this--> <dependency> <groupId>org.apache.shenyu</groupId> <artifactId>shenyu-spring-boot-starter-plugin-divide</artifactId> <version>${last.version}</version> </dependency> <dependency> <groupId>org.apache.shenyu</groupId> <artifactId>shenyu-spring-boot-starter-plugin-httpclient</artifactId> <version>${last.version}</version> </dependency>
- http客户端配置见Apache ShenYu 官方examples体验系列文章
divide插件源码分析
shenyu 网关的核心逻辑是: http请求 shenyu网关 --> shenyu网关根据请求路径解析选择器、插件匹配 --> 匹配到具体插件执行插件逻辑 or 返回异常,未找着选择器。我们 divide插件核心类是 DividePlugin,接下来分析它的核心源码:
public class DividePlugin extends AbstractShenyuPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
……
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
//根据选择器id查上游服务列表,用作负载均衡(UpstreamCacheManager里做了ip端口探活)
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
……
//用负载均衡工具类选择当前上游服务信息
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
……
//封装 上游信息到 spring-web里
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
// set the http timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
//请求上游服务
return chain.execute(exchange);
}
}
上游服务ip端口探活
在请求进入到 DividePlugin 插件后,跟根据根据选择器id在UpstreamCacheManager 对象里查上游服务列表,用作负载均衡。在UpstreamCacheManager 类里开启了周期定时任务做上游服务的端口探活。
探活核心源码分析:
public static boolean checkUrl(final String url) {
if (StringUtils.isBlank(url)) {
return false;
}
if (checkIP(url)) {
……
/**
*探活
* socket.connect(new InetSocketAddress(host, port));
*/
return isHostConnector(hostPort[0], Integer.parseInt(hostPort[1]));
} else {
/**
*探活
* InetAddress.getByName(host).isReachable(1000);
*/
return isHostReachable(url);
}
}
上游信息负载均衡
从 DividePlugin 得知,负载上游信息是通过 LoadBalanceUtils.selector()方法,在该方法里,会根据规则配置的负载均衡类型而用具体的负载均衡类在完成负载,
负载策略有:
- random : 随机
- roudRobin : 轮询
- hash : 哈希
随机算法
随机算法在 RandomLoadBalance里,它的核心逻辑是:求出所有上游服务的总权重,如果总权重不大于0,且所有权重都相等,则用 Random.nextInt(upstreamServers.size)随机取一个上游服务;如果总权重大于0,且所有权重不相同,则在总权重范围取一个随机数,然后用这个随机数减去每个上游服务的权重,第一个让这个随机数小于0的就被选中,我们来看它的核心代码
@Join
public class RandomLoadBalance extends AbstractLoadBalance {
……
@Override
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
……
//求出所有上游服务的总权重
int totalWeight = calculateTotalWeight(upstreamList);
//计算所有上游服务的权重是否相同
boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
if (totalWeight > 0 && !sameWeight) {
//根据权重随机
return random(totalWeight, upstreamList);
}
//在总权重不大于0以及所有权重都相同的时候,就随机选一个上游服务
return random(upstreamList);
}
……
private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
// 如果所有权重不相同,以及总权重大于0 ,则在总权重范围里随机。
int offset = RANDOM.nextInt(totalWeight);
for (DivideUpstream divideUpstream : upstreamList) {
// 确定随机值落在那一段上(当前上游服务权重值越大,越大概率会被选择。权重值越大,减去offset越容易小于0)
offset -= getWeight(divideUpstream);
if (offset < 0) {
return divideUpstream;
}
}
return upstreamList.get(0);
……
}
轮询算法
轮询算法在RoundRobinLoadBalance类中,它是的核心逻辑是:遍历该请求匹配的所有上游服务设置,为每个上游服务新增权重轮询类,并设置元数据权重和当前权重, 当服务权重最大时,当前权重 = -1 * 总权重;当服务权重不是最大时,当前权重 = 当前权重 + 元数据权重。这样子就能实现根据权重来做轮询了。以下为具体代码分析:
@Join
public class RoundRobinLoadBalance extends AbstractLoadBalance {
……
@Override
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
//根据上游服务列表第一个服务获取轮询类中缓存的上游服务信息
String key = upstreamList.get(0).getUpstreamUrl();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
//不存在就创建一个
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
map = methodWeightMap.get(key);
}
//总权重
int totalWeight = 0;
//当前最大值(初始化取最小的long值)
long maxCurrent = Long.MIN_VALUE;
……
//选择的上游服务
DivideUpstream selectedInvoker = null;
//选择的最大权重轮询对象
WeightedRoundRobin selectedWRR = null;
//遍历上游服务
for (DivideUpstream upstream : upstreamList) {
String rKey = upstream.getUpstreamUrl();
WeightedRoundRobin weightedRoundRobin = map.get(rKey);
……
/**
* 根据权重增加当前上游轮询判断的值
* long increaseCurrent() {
* return current.addAndGet(weight);
* }
*/
long cur = weightedRoundRobin.increaseCurrent();
//设置最后更新时间
weightedRoundRobin.setLastUpdate(now);
//如果 当前服务轮询判断值大于最大值,则重置 当前最大值以及当前选择的上游服务和上游权重轮询类
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = upstream;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
// 当上游服务列表发生变化时,更新轮询对象里缓存的对象
if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
// 将之前缓存的对象拷贝到新的对象里
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
// 移除60秒内都没有访问到的上游对象(这里更新逻辑会不会有点问题?当服务请求不频繁的时候,很多服务都可能超过了60秒没访问,但并一定就是soul-admin里移除了的上游服务),下次请求过来后,如果本地缓存被移除的上游服务还存在,重新创建权重轮询对象并put到本地缓存中
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
if (selectedInvoker != null) {
/**
* 当经过上面的循环选出的最大权重上游服务后,将该服务的轮询对象里的当前值设为 负的最大权重值,下次它就排到轮询的末尾了,但是因为它的原始权重大,它在经过 weightedRoundRobin.increaseCurrent()后,它的轮询对象当前值会更快的再次成为最大值,也就实现了权重轮询
*
* void sel(final int total) {
* current.addAndGet(-1 * total);
* }
*
*/
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return upstreamList.get(0);
}
}
哈希算法
哈希算法的核心类是HashLoadBalance,它的核心逻辑是:将每个上游服务以url和某些常量拼接起来计算哈希值,以该哈希值为key存到ConcurrentSkipListMap中;再将客户端请求到ip地址求hash,并用该hash值去取上游服务值 --> treeMap.tailMap(hash); 核心代码:
public class HashLoadBalance extends AbstractLoadBalance {
……
@Override
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
for (DivideUpstream address : upstreamList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHENYU-" + address.getUpstreamUrl() + "-HASH-" + i);
// 将每个上游服务以url和某些常量拼接起来计算哈希值,并存在Map中
treeMap.put(addressHash, address);
}
}
// 根据客户端请求的Ip计算 hash值
long hash = hash(String.valueOf(ip));
//获取其键大于等于hash值的集合
SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
if (!lastRing.isEmpty()) {
//返回第一个key的值
return lastRing.get(lastRing.firstKey());
}
return treeMap.firstEntry().getValue();
}
……
}