说明
- 本文将分析shenyu-admin与shenyu网关通过zookeeper数据同步的底层原理
源码分析
shenyu-admin与shenyul网关通过zookeeper方式数据同步,它们两端都应该有对应处理逻辑的源码。我们首先来看shenyu-admin这端的
shenyu-admin端zookeeper数据同步源码分析
从上文里我们得知,shenyu-admin关于数据同步的类是DataSyncConfiguration,那么我们现在来关注这里面关于zookeeper的代码
@Configuration
@ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url")
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {
@Bean
@ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
return new ZookeeperDataChangedListener(zkClient);
}
@Bean
@ConditionalOnMissingBean(ZookeeperDataInit.class)
public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
return new ZookeeperDataInit(zkClient, syncDataService);
}
}
源码的注释已经很明晰:
- @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url"): 只有配置了shenyu.sync.zookeeper.url属性,才装配改类
- ZookeeperDataChangedListener:zookeeper数据变动监听对象
- ZookeeperDataInit:zookeeper数据初始化数据类
所以我们要开启zookeeper同步方式,要在yml配置文件里加上上述zookeeper配置
我们进一步分析ZookeeperDataInit对象,初始化数据做了什么。我们创建ZookeeperDataInit对象时,给该对象构造注入了zookeeper客户端对象以及异步数据同步对象,SyncDataService的实现类是SyncDataServiceImpl。我们现在就看下ZookeeperDataInit源码
public class ZookeeperDataInit implements CommandLineRunner {
……
//该类实现了CommandLineRunner,当该对象被spring装配后就会执行run方法
@Override
public void run(final String... args) {
String pluginPath = ZkPathConstants.PLUGIN_PARENT;
String authPath = ZkPathConstants.APP_AUTH_PARENT;
String metaDataPath = ZkPathConstants.META_DATA;
if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
//数据同步,传递了REFRESH 刷新枚举值
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}
}
SyncDataServiceImpl源码分析:
@Service("syncDataService")
public class SyncDataServiceImpl implements SyncDataService {
@Override
public boolean syncAll(final DataEventTypeEnum type) {
//认证相关信息同步(查库得到认证相关信息,并通过 eventbus发布更新事件)
appAuthService.syncData();
//获取插件数据(查库)
List<PluginData> pluginDataList = pluginService.listAll();
//用eventbus发布插件更新事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
//获取选择器数据(查库)
List<SelectorData> selectorDataList = selectorService.listAll();
//用eventbus发布选择器更新事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
//获取规则数据(查库)
List<RuleData> ruleDataList = ruleService.listAll();
//用eventbus发布规则更新事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
//元数据同步(查库得到元数据相关信息,并通过 eventbus发布更新事件)
metaDataService.syncData();
return true;
}
}
上面SyncDataServiceImpl代码我给出了详细注释,appAuthService和metaDataService也是查库并发布事件逻辑,我就不贴代码了。它们的逻辑是运用观察者模式,具体是通过查库然后用eventbus发布事件将数据分发出去,那么就必然有一个事件监听来处理这些数据。它的事件监听类是DataChangedEventDispatcher。我们接着看它的源码:
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
……
private List<DataChangedListener> listeners;
/**
* 这里就是我们的事件监听类具体监听方法,所有前面通过eventbus发布的事件,都通过这里来处理
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
//根据我们发布事件时传递的gourpkey来处理是规则、认证、选择器、元数据更新处理
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
…………
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
…………
}
}
}
@Override
public void afterPropertiesSet() {
// 这里通过我们spring的上下文获取我们刚通过DataSyncConfiguration装配的DataChangedListener类型bean
Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
}
}
接着我们就来看我们上面装配好的ZookeeperDataChangedListener对象了,它具体来处理我们初始化的数据。整个数据初始化的逻辑是:
- 查库获取配置信息,
- 根据不同同步方式的分组用eventbus将数据发布出去
- 具体同步监听类处理具体的数据
下面是ZookeeperDataChangedListener部分核心源码,在此将数据存放到zookeeper中。
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
for (PluginData data : changed) {
final String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
// 删除zookeeper中对应的配置信息
if (eventType == DataEventTypeEnum.DELETE) {
deleteZkPathRecursive(pluginPath);
…………
continue;
}
//创建或更新zookeeper中对应的配置信息
upsertZkNode(pluginPath, data);
}
}
至此,我们分析完了shenyu-admin里通过zookeeper同步配置信息的核心源码。
shenyul网关zookeeper数据同步源码分析
下面我们分析shenyu网关这边对于用zookeeper同步数据的具体源码。
shenyu-bootstrap关于zookeeper同步数据的依赖是shenyu-spring-boot-starter-sync-data-zookeeper,该spring-boot-start项目的spring.factories信息为:
- org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.shenyu.shenyu.spring.boot.sync.data.zookeeper.ZookeeperSyncDataConfiguration
那我们接着来看ZookeeperSyncDataConfiguration类
@Configuration
@ConditionalOnClass(ZookeeperSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url")
@EnableConfigurationProperties(ZookeeperConfig.class)
@Slf4j
public class ZookeeperSyncDataConfiguration {
@Bean
public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use zookeeper sync shenyu data.......");
return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
………………
}
注解说明就不在赘述,就是一些配置开关,满足了条件才装配该类。该类装配ZookeeperSyncDataService这个类。我们接下来看该类核心源码:
public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
…………
/**
* 构造函数里监听zookeeper里配置信息变化
*/
public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
…………
//监听zookeeper相关变化
watcherData();
watchAppAuth();
watchMetaData();
}
……
//zookeeperClient监听选择器路径数据变化(插件、元数据、认证信息等逻辑一致,不贴源码)
private void subscribeSelectorDataChanges(final String path) {
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(final String dataPath, final Object data) {
//在zookeeper 选择器路径数据变化后,更新对应的信息
cacheSelectorData((SelectorData) data);
}
@Override
public void handleDataDeleted(final String dataPath) {
//在zookeeper 选择器路径数据变化后,删除对应的信息
unCacheSelectorData(dataPath);
}
});
}
…………
//将shenyu-admin更新的zookeeper的配置信息发布到 CommonPluginDataSubscriber处理
private void cacheSelectorData(final SelectorData selectorData) {
Optional.ofNullable(selectorData)
.ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data))); //核心方法onSelectorSubscribe(data)
}
}
由ZookeeperSyncDataService核心源码分析,最终数据会交给CommonPluginDataSubscriber处理。下面我们继续分析CommonPluginDataSubscriber类:
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
…………
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
//将数据缓存到本地内存中(用的ConcurrentMap数据结构)
BaseDataCache.getInstance().cachePluginData(pluginData);
//将数据更新到验证规则时的单例对象
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
}
…………
});
}
}
CommonPluginDataSubscriber类中涉及到的本地缓存类BaseDataCache,里面就是用Map缓存的数据:
//基本数据缓存类
public final class BaseDataCache {
……
/**
* pluginName -> PluginData.
*/
private static final ConcurrentMap<String, PluginData> PLUGIN_MAP = Maps.newConcurrentMap();
……
public void cachePluginData(final PluginData pluginData) {
//将数据存到Map中
Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.put(data.getName(), data));
}
}
CommonPluginDataSubscriber类中涉及到的网关具体插件的handle类,会将数据更新到网关执行时查询规则的单例类(以waf插件举例),当更新了这个数据后,下次请求就会去验证最新规则,达成热更新。
//具体插件处理更新到数据(当前以waf插件为例)
public class WafPluginDataHandler implements PluginDataHandler {
@Override
public void handlerPlugin(final PluginData pluginData) {
WafConfig wafConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), WafConfig.class);
//添加数据到单例对象中的map(每次请求去验证规则的时候就取自map中的数据,从而完成热更新)
Singleton.INST.single(WafConfig.class, wafConfig);
}
…………
}
//单例对象,验证规则时在这里面取规则数据
public enum Singleton {
…………
private static final Map<String, Object> SINGLES = new ConcurrentHashMap<>();
//新增配置数据(或覆盖)
public void single(final Class clazz, final Object o) {
SINGLES.put(clazz.getName(), o);
}
//获取配置数据
@SuppressWarnings("unchecked")
public <T> T get(final Class<T> clazz) {
return (T) SINGLES.get(clazz.getName());
}
}
总结
至此,shenyu网关通过zookeeper更新数据整个源码分析完毕。主要逻辑为,shenyu-admin启动时会从数据库里拉取最新插件、认证、选择器等配置信息并存到zookeeper中,当用户在shenyu-admin前台修改了相关配置,就同步到zookeeper中;shenyu-bootstrap启动后会根据zookeeper配置信息,通过zkClient监听 zookeeper中对应path下的数据变化,有数据变化时就将对应数据更新到内存中,下次请求就会根据最新规则去执行网关逻辑。