说明
- 本文将通过源码分析 Apache ShenYu 网关通过 http 长轮询数据同步
- shenyu 网关用http长轮询方式同步数据需要分别在shenyu-admin以及shenyu-bootsrap项目中添加相关配置和依赖
- shenyu 借鉴了 Apollo、Nacos 的设计思想,取其精华,自己实现的 http 长轮询数据同步功能
- 注意,这里并非传统的 ajax 长轮询
http长轮询数据同步流程图(来自shenyul官方)
shenyu-admin,请求 admin 配置服务超时时间是90秒,也就意味着网关曾请求配置服务最多会等90秒,官方说明这便于 admin 配置服务及时响应变更数据,从而实现准实时推送。shenyu-web网关服务请求 admin 配置服务并不是立刻响应返回,而是利用Servelt3.0异步机制,将请求存到本地BlocingQueue队列中,并开启任务调度60秒后执行。为什么60秒?在下面源码分析会进一步阐述。
源码分析
shenyu-admin端关于http数据同步
在shenyu-admin源码中,通过springboot web 给提供了查询配置和监听配置的接口。HttpLongPollingDataChangedListener是处理http长连接数据推送的类,在核心方法doLongPolling(……)中通过ScheduledExecutorService开启调度,60秒后执行。60秒后返回数据。注意,这里返回的不是具体的配置信息,而是返回的配置分组信息,shenyu-web 网关会根据配置分组再调用 shenyu-admin提供的配置查询接口。
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
//验证配置变更,若shenyu-web未收到某个配置变更的通知,md5不一致,则立即响应
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}
//获得servlet3.0异步响应http请求
final AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0L);
//立即执行任务调度
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
在LongPollingClient里,开启了任务调度,60秒后执行。并把当前的clients存到本地BlockingQueue队列中。
class LongPollingClient implements Runnable {
……
@Override
public void run() {
//60秒后执行
this.asyncTimeoutFuture = scheduler.schedule(() -> {
//将队列中移除当前任务
clients.remove(LongPollingClient.this);
//获取本地当前缓存中所有的配置分组信息
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
//发送响应给shenyu-web网关调用者
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
clients.add(this);
}
}
shenyu-web网关关于http数据同步
shenyu-web通过http轮询的方式去请求sou-admin提供出的配置监听接口(/configs/listener),从上面的源码分析得知,轮询中每次请求至少60秒以后才会得到响应,响应的数据是配置分组信息,shenyu-web再用请求到的响应值groupkey集合去请求shenyu-admin提供查询当前最新配置的接口 -> /configs/fetch获取最新配置。
shenyu网关关于http长连接同步数据的spring-boot-start是shenyu-spring-boot-starter-sync-data-http项目。它装配了HttpSyncDataService类。在这里面开启了http轮询。下面贴出核心源码,并给上了详实的注释
public class HttpSyncDataService implements SyncDataService, AutoCloseable {
……
private void start() {
//RUNNING是AtomicBoolean,默认false。
if (RUNNING.compareAndSet(false, true)) {
//请求 shenyu-web的/configs/fetch接口获取最新配置
this.fetchGroupConfig(ConfigGroupEnum.values());
……
//开启轮询,若配置了多个shenyu-admin服务,每个服务都会开启轮询
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
}
……
}
//请求sou-admin提供出的配置监听接口
private void doLongPolling(final String server) {
……
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
……
//这里用的hard code……
String listenerUrl = server + "/configs/listener";
try {
//执行请求
String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
……
}
……
if (groupJson != null) {
//将响应转换为配置分组枚举类
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
if (ArrayUtils.isNotEmpty(changedGroups)) {
//通过配置分组去请求shenyu-admin /configs/fetch接口获取最新配置
this.doFetchGroupConfig(server, changedGroups);
}
}
}
//通过配置分组去请求shenyu-admin /configs/fetch接口获取最新配置
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
//这里又是hard code
String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
try {
//根据配置分组信息获取最新配置信息
json = this.httpClient.getForObject(url, String.class);
}
……
// 更新shenyu网关本地缓存配置
boolean updated = this.updateCacheWithJson(json);
……
}
}
http长轮询数据同步机制,先获取配置group信息,再用group信息作为参数请求shenyu-admin的 fetch接口获取配置具体信息。为什么要通过两次请求来同步呢?官方解释是
因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。