Apache ShenYu 网关 divide插件底层原理分析

说明

  • 本文将分析 Apache ShenYu 网关 divide 插件底层原理、负载均衡、ip端口探活

divide插件底层原理分析

插件功能说明

divide插件是进行http正向代理,所有的http请求都由该插件进行负载均衡调用。具体的负载均衡策略在规则中指定。配置详解:

  • 第一个框:hostName,一般填写 localhost,该字段暂时没使用。
  • 第二个框:http协议,一般填写 http:// 或者 https:// ,不填写默认为:http://
  • 第三个框:ip与端口,这里填写你真实服务的 ip + 端口。
插件源码分析

插件配置

  • 首先在shenyu-admin的插件管理中开启divide插件
  • shenyu-bootstrap中加上 divide插件的依赖
    <!--if you use http proxy start this-->
    <dependency>
        <groupId>org.apache.shenyu</groupId>
        <artifactId>shenyu-spring-boot-starter-plugin-divide</artifactId>
        <version>${last.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.shenyu</groupId>
        <artifactId>shenyu-spring-boot-starter-plugin-httpclient</artifactId>
        <version>${last.version}</version>
    </dependency>
  • http客户端配置见Apache ShenYu 官方examples体验系列文章
divide插件源码分析

shenyu 网关的核心逻辑是: http请求 shenyu网关 --> shenyu网关根据请求路径解析选择器、插件匹配 --> 匹配到具体插件执行插件逻辑 or 返回异常,未找着选择器。我们 divide插件核心类是 DividePlugin,接下来分析它的核心源码:

public class DividePlugin extends AbstractShenyuPlugin {

    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
        ……

        final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
        //根据选择器id查上游服务列表,用作负载均衡(UpstreamCacheManager里做了ip端口探活)
        final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
        ……
        //用负载均衡工具类选择当前上游服务信息
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);

        ……
        //封装 上游信息到 spring-web里
        exchange.getAttributes().put(Constants.HTTP_URL, realURL);
        // set the http timeout
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        //请求上游服务
        return chain.execute(exchange);
    }
}
上游服务ip端口探活

在请求进入到 DividePlugin 插件后,跟根据根据选择器id在UpstreamCacheManager 对象里查上游服务列表,用作负载均衡。在UpstreamCacheManager 类里开启了周期定时任务做上游服务的端口探活。

探活核心源码分析:

 public static boolean checkUrl(final String url) {
        if (StringUtils.isBlank(url)) {
            return false;
        }
        if (checkIP(url)) {
            ……
            /**
            *探活 
            *  socket.connect(new InetSocketAddress(host, port));
            */
            return isHostConnector(hostPort[0], Integer.parseInt(hostPort[1]));
        } else {
            /**
            *探活 
            * InetAddress.getByName(host).isReachable(1000);
            */
            return isHostReachable(url);
        }
}
上游信息负载均衡

从 DividePlugin 得知,负载上游信息是通过 LoadBalanceUtils.selector()方法,在该方法里,会根据规则配置的负载均衡类型而用具体的负载均衡类在完成负载,
负载策略有:

  • random : 随机
  • roudRobin : 轮询
  • hash : 哈希

随机算法
随机算法在 RandomLoadBalance里,它的核心逻辑是:求出所有上游服务的总权重,如果总权重不大于0,且所有权重都相等,则用 Random.nextInt(upstreamServers.size)随机取一个上游服务;如果总权重大于0,且所有权重不相同,则在总权重范围取一个随机数,然后用这个随机数减去每个上游服务的权重,第一个让这个随机数小于0的就被选中,我们来看它的核心代码

@Join
public class RandomLoadBalance extends AbstractLoadBalance {

    ……

    @Override
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        ……
        //求出所有上游服务的总权重
        int totalWeight = calculateTotalWeight(upstreamList);
        //计算所有上游服务的权重是否相同
        boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
        if (totalWeight > 0 && !sameWeight) {
            //根据权重随机
            return random(totalWeight, upstreamList);
        }
        //在总权重不大于0以及所有权重都相同的时候,就随机选一个上游服务
        return random(upstreamList);
    }

    ……

    private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
        // 如果所有权重不相同,以及总权重大于0 ,则在总权重范围里随机。
        int offset = RANDOM.nextInt(totalWeight);
        for (DivideUpstream divideUpstream : upstreamList) {
             // 确定随机值落在那一段上(当前上游服务权重值越大,越大概率会被选择。权重值越大,减去offset越容易小于0)
            offset -= getWeight(divideUpstream);
            if (offset < 0) {
                return divideUpstream;
            }
        }
        return upstreamList.get(0);

    ……
}

轮询算法
轮询算法在RoundRobinLoadBalance类中,它是的核心逻辑是:遍历该请求匹配的所有上游服务设置,为每个上游服务新增权重轮询类,并设置元数据权重和当前权重, 当服务权重最大时,当前权重 = -1 * 总权重;当服务权重不是最大时,当前权重 = 当前权重 + 元数据权重。这样子就能实现根据权重来做轮询了。以下为具体代码分析:

@Join
public class RoundRobinLoadBalance extends AbstractLoadBalance {

    ……
    @Override
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        //根据上游服务列表第一个服务获取轮询类中缓存的上游服务信息
        String key = upstreamList.get(0).getUpstreamUrl();
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        //不存在就创建一个
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
            map = methodWeightMap.get(key);
        }
        //总权重
        int totalWeight = 0;
        //当前最大值(初始化取最小的long值)
        long maxCurrent = Long.MIN_VALUE;
        ……
        //选择的上游服务
        DivideUpstream selectedInvoker = null;
        //选择的最大权重轮询对象
        WeightedRoundRobin selectedWRR = null;
        //遍历上游服务
        for (DivideUpstream upstream : upstreamList) {
            String rKey = upstream.getUpstreamUrl();
            WeightedRoundRobin weightedRoundRobin = map.get(rKey);
            ……
            /**
            * 根据权重增加当前上游轮询判断的值
            *     long increaseCurrent() {
            *        return current.addAndGet(weight);
            *    }
            */
            long cur = weightedRoundRobin.increaseCurrent();
            //设置最后更新时间
            weightedRoundRobin.setLastUpdate(now);
            //如果 当前服务轮询判断值大于最大值,则重置 当前最大值以及当前选择的上游服务和上游权重轮询类
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = upstream;
                selectedWRR = weightedRoundRobin;
            }
            totalWeight += weight;
        }
        // 当上游服务列表发生变化时,更新轮询对象里缓存的对象
        if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
            try {
                // 将之前缓存的对象拷贝到新的对象里
                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                // 移除60秒内都没有访问到的上游对象(这里更新逻辑会不会有点问题?当服务请求不频繁的时候,很多服务都可能超过了60秒没访问,但并一定就是soul-admin里移除了的上游服务),下次请求过来后,如果本地缓存被移除的上游服务还存在,重新创建权重轮询对象并put到本地缓存中
                newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
                methodWeightMap.put(key, newMap);
            } finally {
                updateLock.set(false);
            }
        }
        if (selectedInvoker != null) {
            /**
            * 当经过上面的循环选出的最大权重上游服务后,将该服务的轮询对象里的当前值设为 负的最大权重值,下次它就排到轮询的末尾了,但是因为它的原始权重大,它在经过 weightedRoundRobin.increaseCurrent()后,它的轮询对象当前值会更快的再次成为最大值,也就实现了权重轮询
            *
            *  void sel(final int total) {
            *      current.addAndGet(-1 * total);
            *   }
            *
            */
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        return upstreamList.get(0);
    }
 }

哈希算法
哈希算法的核心类是HashLoadBalance,它的核心逻辑是:将每个上游服务以url和某些常量拼接起来计算哈希值,以该哈希值为key存到ConcurrentSkipListMap中;再将客户端请求到ip地址求hash,并用该hash值去取上游服务值 --> treeMap.tailMap(hash); 核心代码:

public class HashLoadBalance extends AbstractLoadBalance {
    ……
    @Override
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
        for (DivideUpstream address : upstreamList) {
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                long addressHash = hash("SHENYU-" + address.getUpstreamUrl() + "-HASH-" + i);
                // 将每个上游服务以url和某些常量拼接起来计算哈希值,并存在Map中
                treeMap.put(addressHash, address);
            }
        }

        // 根据客户端请求的Ip计算 hash值
        long hash = hash(String.valueOf(ip));
        //获取其键大于等于hash值的集合
        SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
        if (!lastRing.isEmpty()) {
            //返回第一个key的值
            return lastRing.get(lastRing.firstKey());
        }
        return treeMap.firstEntry().getValue();
    }
    ……
}

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注