Apache ShenYu 网关数据同步之http

说明

  • 本文将通过源码分析 Apache ShenYu 网关通过 http 长轮询数据同步
  • shenyu 网关用http长轮询方式同步数据需要分别在shenyu-admin以及shenyu-bootsrap项目中添加相关配置和依赖
  • shenyu 借鉴了 Apollo、Nacos 的设计思想,取其精华,自己实现的 http 长轮询数据同步功能
  • 注意,这里并非传统的 ajax 长轮询

http长轮询数据同步流程图(来自shenyul官方)

shenyu-admin,请求 admin 配置服务超时时间是90秒,也就意味着网关曾请求配置服务最多会等90秒,官方说明这便于 admin 配置服务及时响应变更数据,从而实现准实时推送。shenyu-web网关服务请求 admin 配置服务并不是立刻响应返回,而是利用Servelt3.0异步机制,将请求存到本地BlocingQueue队列中,并开启任务调度60秒后执行。为什么60秒?在下面源码分析会进一步阐述。

源码分析

shenyu-admin端关于http数据同步

在shenyu-admin源码中,通过springboot web 给提供了查询配置和监听配置的接口。HttpLongPollingDataChangedListener是处理http长连接数据推送的类,在核心方法doLongPolling(……)中通过ScheduledExecutorService开启调度,60秒后执行。60秒后返回数据。注意,这里返回的不是具体的配置信息,而是返回的配置分组信息,shenyu-web 网关会根据配置分组再调用 shenyu-admin提供的配置查询接口。

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {

        //验证配置变更,若shenyu-web未收到某个配置变更的通知,md5不一致,则立即响应
        List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
        String clientIp = getRemoteIp(request);
        if (CollectionUtils.isNotEmpty(changedGroup)) {
            this.generateResponse(response, changedGroup);
            log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
            return;
        }

        //获得servlet3.0异步响应http请求
        final AsyncContext asyncContext = request.startAsync();
        asyncContext.setTimeout(0L);
        //立即执行任务调度
        scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}

在LongPollingClient里,开启了任务调度,60秒后执行。并把当前的clients存到本地BlockingQueue队列中。

class LongPollingClient implements Runnable {
  ……
 @Override
    public void run() {
        //60秒后执行
        this.asyncTimeoutFuture = scheduler.schedule(() -> {
            //将队列中移除当前任务
            clients.remove(LongPollingClient.this);
            //获取本地当前缓存中所有的配置分组信息
            List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());

            //发送响应给shenyu-web网关调用者
            sendResponse(changedGroups);
        }, timeoutTime, TimeUnit.MILLISECONDS); 
        clients.add(this);
    }   
}
shenyu-web网关关于http数据同步

shenyu-web通过http轮询的方式去请求sou-admin提供出的配置监听接口(/configs/listener),从上面的源码分析得知,轮询中每次请求至少60秒以后才会得到响应,响应的数据是配置分组信息,shenyu-web再用请求到的响应值groupkey集合去请求shenyu-admin提供查询当前最新配置的接口 -> /configs/fetch获取最新配置。

shenyu网关关于http长连接同步数据的spring-boot-start是shenyu-spring-boot-starter-sync-data-http项目。它装配了HttpSyncDataService类。在这里面开启了http轮询。下面贴出核心源码,并给上了详实的注释

public class HttpSyncDataService implements SyncDataService, AutoCloseable {
    ……
    private void start() {
       //RUNNING是AtomicBoolean,默认false。
        if (RUNNING.compareAndSet(false, true)) {
            //请求 shenyu-web的/configs/fetch接口获取最新配置
            this.fetchGroupConfig(ConfigGroupEnum.values());
            ……

            //开启轮询,若配置了多个shenyu-admin服务,每个服务都会开启轮询
            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
        } 
        ……
    } 

    //请求sou-admin提供出的配置监听接口
    private void doLongPolling(final String server) {
        ……
        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
        ……  
        //这里用的hard code……
        String listenerUrl = server + "/configs/listener";
        try {
            //执行请求
            String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
             ……  
         }
        ……
        if (groupJson != null) {
            //将响应转换为配置分组枚举类
            ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
            if (ArrayUtils.isNotEmpty(changedGroups)) {
                //通过配置分组去请求shenyu-admin /configs/fetch接口获取最新配置
                this.doFetchGroupConfig(server, changedGroups);
            }
        }
    } 

    //通过配置分组去请求shenyu-admin /configs/fetch接口获取最新配置
    private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
        StringBuilder params = new StringBuilder();
        for (ConfigGroupEnum groupKey : groups) {
                       params.append("groupKeys").append("=").append(groupKey.name()).append("&");
        }
        //这里又是hard code
        String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
        try {
            //根据配置分组信息获取最新配置信息
            json = this.httpClient.getForObject(url, String.class);
        } 
        ……
        // 更新shenyu网关本地缓存配置
        boolean updated = this.updateCacheWithJson(json);
        ……
    }

}

http长轮询数据同步机制,先获取配置group信息,再用group信息作为参数请求shenyu-admin的 fetch接口获取配置具体信息。为什么要通过两次请求来同步呢?官方解释是

因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注