Apache ShenYu 网关数据同步之zookeeper

说明

  • 本文将分析shenyu-admin与shenyu网关通过zookeeper数据同步的底层原理

源码分析

shenyu-admin与shenyul网关通过zookeeper方式数据同步,它们两端都应该有对应处理逻辑的源码。我们首先来看shenyu-admin这端的

shenyu-admin端zookeeper数据同步源码分析

从上文里我们得知,shenyu-admin关于数据同步的类是DataSyncConfiguration,那么我们现在来关注这里面关于zookeeper的代码

@Configuration
@ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url")
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {

    @Bean
    @ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
    public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
        return new ZookeeperDataChangedListener(zkClient);
    }

    @Bean
    @ConditionalOnMissingBean(ZookeeperDataInit.class)
    public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
        return new ZookeeperDataInit(zkClient, syncDataService);
    }
}

源码的注释已经很明晰:

  • @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url"): 只有配置了shenyu.sync.zookeeper.url属性,才装配改类
  • ZookeeperDataChangedListener:zookeeper数据变动监听对象
  • ZookeeperDataInit:zookeeper数据初始化数据类

所以我们要开启zookeeper同步方式,要在yml配置文件里加上上述zookeeper配置

我们进一步分析ZookeeperDataInit对象,初始化数据做了什么。我们创建ZookeeperDataInit对象时,给该对象构造注入了zookeeper客户端对象以及异步数据同步对象,SyncDataService的实现类是SyncDataServiceImpl。我们现在就看下ZookeeperDataInit源码

public class ZookeeperDataInit implements CommandLineRunner {

    ……

    //该类实现了CommandLineRunner,当该对象被spring装配后就会执行run方法
    @Override
    public void run(final String... args) {
        String pluginPath = ZkPathConstants.PLUGIN_PARENT;
        String authPath = ZkPathConstants.APP_AUTH_PARENT;
        String metaDataPath = ZkPathConstants.META_DATA;
        if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
            //数据同步,传递了REFRESH 刷新枚举值
            syncDataService.syncAll(DataEventTypeEnum.REFRESH);
        }
    }
}

SyncDataServiceImpl源码分析:

@Service("syncDataService")
public class SyncDataServiceImpl implements SyncDataService {
  @Override
    public boolean syncAll(final DataEventTypeEnum type) {
        //认证相关信息同步(查库得到认证相关信息,并通过 eventbus发布更新事件)
        appAuthService.syncData(); 
        //获取插件数据(查库)
        List<PluginData> pluginDataList = pluginService.listAll();
        //用eventbus发布插件更新事件
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
        //获取选择器数据(查库)
        List<SelectorData> selectorDataList = selectorService.listAll();
        //用eventbus发布选择器更新事件
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
        //获取规则数据(查库)
        List<RuleData> ruleDataList = ruleService.listAll();
        //用eventbus发布规则更新事件
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
        //元数据同步(查库得到元数据相关信息,并通过 eventbus发布更新事件)
        metaDataService.syncData();
        return true;
    }
}

上面SyncDataServiceImpl代码我给出了详细注释,appAuthService和metaDataService也是查库并发布事件逻辑,我就不贴代码了。它们的逻辑是运用观察者模式,具体是通过查库然后用eventbus发布事件将数据分发出去,那么就必然有一个事件监听来处理这些数据。它的事件监听类是DataChangedEventDispatcher。我们接着看它的源码:

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    ……
    private List<DataChangedListener> listeners;

    /**
    * 这里就是我们的事件监听类具体监听方法,所有前面通过eventbus发布的事件,都通过这里来处理
    */
    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        //根据我们发布事件时传递的gourpkey来处理是规则、认证、选择器、元数据更新处理
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                …………
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                …………
            }
        }
    }

    @Override
    public void afterPropertiesSet() {
        // 这里通过我们spring的上下文获取我们刚通过DataSyncConfiguration装配的DataChangedListener类型bean
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }
}

接着我们就来看我们上面装配好的ZookeeperDataChangedListener对象了,它具体来处理我们初始化的数据。整个数据初始化的逻辑是:

  • 查库获取配置信息,
  • 根据不同同步方式的分组用eventbus将数据发布出去
  • 具体同步监听类处理具体的数据

下面是ZookeeperDataChangedListener部分核心源码,在此将数据存放到zookeeper中。

   @Override
    public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
        for (PluginData data : changed) {
            final String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
            // 删除zookeeper中对应的配置信息
            if (eventType == DataEventTypeEnum.DELETE) {
                deleteZkPathRecursive(pluginPath);
                …………
                continue;
            }
            //创建或更新zookeeper中对应的配置信息
            upsertZkNode(pluginPath, data);
        }
    }

至此,我们分析完了shenyu-admin里通过zookeeper同步配置信息的核心源码。

shenyul网关zookeeper数据同步源码分析

下面我们分析shenyu网关这边对于用zookeeper同步数据的具体源码。
shenyu-bootstrap关于zookeeper同步数据的依赖是shenyu-spring-boot-starter-sync-data-zookeeper,该spring-boot-start项目的spring.factories信息为:

  • org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.apache.shenyu.shenyu.spring.boot.sync.data.zookeeper.ZookeeperSyncDataConfiguration

那我们接着来看ZookeeperSyncDataConfiguration类

@Configuration
@ConditionalOnClass(ZookeeperSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url")
@EnableConfigurationProperties(ZookeeperConfig.class)
@Slf4j
public class ZookeeperSyncDataConfiguration {

    @Bean
    public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
        log.info("you use zookeeper sync shenyu data.......");
        return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }

    ………………

}

注解说明就不在赘述,就是一些配置开关,满足了条件才装配该类。该类装配ZookeeperSyncDataService这个类。我们接下来看该类核心源码:

public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
    …………
    /**
    * 构造函数里监听zookeeper里配置信息变化
    */
    public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber,
                                final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
        …………
        //监听zookeeper相关变化
        watcherData();
        watchAppAuth();
        watchMetaData();
    }

    ……

    //zookeeperClient监听选择器路径数据变化(插件、元数据、认证信息等逻辑一致,不贴源码)
    private void subscribeSelectorDataChanges(final String path) {
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            @Override
            public void handleDataChange(final String dataPath, final Object data) {
                //在zookeeper 选择器路径数据变化后,更新对应的信息
                cacheSelectorData((SelectorData) data);
            }

            @Override
            public void handleDataDeleted(final String dataPath) {
               //在zookeeper 选择器路径数据变化后,删除对应的信息
                unCacheSelectorData(dataPath);
            }
        });
    }

    …………
    //将shenyu-admin更新的zookeeper的配置信息发布到 CommonPluginDataSubscriber处理
    private void cacheSelectorData(final SelectorData selectorData) {
        Optional.ofNullable(selectorData)
                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data))); //核心方法onSelectorSubscribe(data)
    }

} 

由ZookeeperSyncDataService核心源码分析,最终数据会交给CommonPluginDataSubscriber处理。下面我们继续分析CommonPluginDataSubscriber类:

public class CommonPluginDataSubscriber implements PluginDataSubscriber {
    …………

    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
        Optional.ofNullable(classData).ifPresent(data -> {
            if (data instanceof PluginData) {
                PluginData pluginData = (PluginData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    //将数据缓存到本地内存中(用的ConcurrentMap数据结构)
                    BaseDataCache.getInstance().cachePluginData(pluginData);
                    //将数据更新到验证规则时的单例对象
                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
                }
             …………   
        });
    }    
}

CommonPluginDataSubscriber类中涉及到的本地缓存类BaseDataCache,里面就是用Map缓存的数据:

//基本数据缓存类
public final class BaseDataCache {
  ……
    /**
    * pluginName -> PluginData.
    */
    private static final ConcurrentMap<String, PluginData> PLUGIN_MAP = Maps.newConcurrentMap();
    ……
    public void cachePluginData(final PluginData pluginData) {
        //将数据存到Map中
        Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.put(data.getName(), data));
    }
}

CommonPluginDataSubscriber类中涉及到的网关具体插件的handle类,会将数据更新到网关执行时查询规则的单例类(以waf插件举例),当更新了这个数据后,下次请求就会去验证最新规则,达成热更新。

//具体插件处理更新到数据(当前以waf插件为例)
public class WafPluginDataHandler implements PluginDataHandler {

    @Override
    public void handlerPlugin(final PluginData pluginData) {
        WafConfig wafConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), WafConfig.class);
        //添加数据到单例对象中的map(每次请求去验证规则的时候就取自map中的数据,从而完成热更新)
        Singleton.INST.single(WafConfig.class, wafConfig);
   }
   …………  
}

//单例对象,验证规则时在这里面取规则数据
public enum Singleton {
    …………
    private static final Map<String, Object> SINGLES = new ConcurrentHashMap<>();

    //新增配置数据(或覆盖)
    public void single(final Class clazz, final Object o) {
        SINGLES.put(clazz.getName(), o);
    }

    //获取配置数据
    @SuppressWarnings("unchecked")
    public <T> T get(final Class<T> clazz) {
        return (T) SINGLES.get(clazz.getName());
    }
}

总结

至此,shenyu网关通过zookeeper更新数据整个源码分析完毕。主要逻辑为,shenyu-admin启动时会从数据库里拉取最新插件、认证、选择器等配置信息并存到zookeeper中,当用户在shenyu-admin前台修改了相关配置,就同步到zookeeper中;shenyu-bootstrap启动后会根据zookeeper配置信息,通过zkClient监听 zookeeper中对应path下的数据变化,有数据变化时就将对应数据更新到内存中,下次请求就会根据最新规则去执行网关逻辑。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注