栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ShenYu网关数据同步源码分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ShenYu网关数据同步源码分析

目录

admin数据同步

从SelectorController讲起发布事件ConfigGroupEnumDataEventTypeEnumDataChangedEventDispatcher配置解析 gateway数据同步

ShenyuWebsocketClientWebsocketDataHandlerAbstractDataHandlerCommonPluginDataSubscriberbaseDataCache配置解析 验证猜想

增量数据同步全量数据同步总结
当在后台管理系统中,数据发生了更新后,如何将更新的数据同步到网关中呢?

ShenYu支持多种同步方式,本文以WebSocket为例分析。

admin数据同步 从SelectorController讲起

一般情况下像admin这种后台管理系统,启动的时候应该会全量的同步一次数据,后续如果发生修改,会增量同步数据。所以我们在admin的controller下,可以找到SelectorController的createSelector方法,从开始createSelector分析。

@PostMapping("")
public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) {
    Integer createCount = selectorService.createOrUpdate(selectorDTO);
    return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);
}

具体实现如下

@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {
    // 负责事件发布的eventPublisher
    private final ApplicationEventPublisher eventPublisher;
    
    @Override
    @Transactional(rollbackFor = Exception.class)
    public int createOrUpdate(final SelectorDTO selectorDTO) {
        int selectorCount;
        // 构建数据 DTO --> DO
        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
        List selectorConditionDTOs = selectorDTO.getSelectorConditions();
        // 判断是添加还是更新
        if (StringUtils.isEmpty(selectorDTO.getId())) {
            // 插入选择器数据
            selectorCount = selectorMapper.insertSelective(selectorDO);
            // 插入选择器中的条件数据
            selectorConditionDTOs.forEach(selectorConditionDTO -> {
                selectorConditionDTO.setSelectorId(selectorDO.getId());
                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
            });
            // check selector add
            // 权限检查
            if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {
                DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();
                dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());
                dataPermissionDTO.setDataId(selectorDO.getId());
                dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);
                dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));
            }

        } else {
            // 更新数据,先删除再新增
            selectorCount = selectorMapper.updateSelective(selectorDO);
            //delete rule condition then add
            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
            selectorConditionDTOs.forEach(selectorConditionDTO -> {
                selectorConditionDTO.setSelectorId(selectorDO.getId());
                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
                selectorConditionMapper.insertSelective(selectorConditionDO);
            });
        }
        // 发布事件
        publishEvent(selectorDO, selectorConditionDTOs);

        // 更新upstream
        updateDivideUpstream(selectorDO);
        return selectorCount;
    }
    
    
    // ......
    
}

浏览以上代码,我们猜测和同步有关系的方法,如下:

发布事件

publishEvent(selectorDO, selectorConditionDTOs);

发布事件
private void publishEvent(final SelectorDO selectorDO, final List selectorConditionDTOs) {
    PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
    List conditionDataList =
            selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
    // publish change event.
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
            Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}

eventPublisher的对象为org.springframework.context.ApplicationEventPublisher。那么他一定是通过Spring的事件发布机制实现的。

Spring的事件发布机制,需要有发布者、监听者、以及事件。目前我们已经找到了发布者,从上面源码中我们可以知道,事件为DataChangedEvent。

public class DataChangedEvent extends ApplicationEvent {
    private final DataEventTypeEnum eventType;

    private final ConfigGroupEnum groupKey;
    
    public DataChangedEvent(final ConfigGroupEnum groupKey, final DataEventTypeEnum type, final List source) {
        super(source.stream().filter(Objects::nonNull).collect(Collectors.toList()));
        this.eventType = type;
        this.groupKey = groupKey;
    }
}
ConfigGroupEnum

从ConfigGroupEnum中我们可以知道,admin需要与gateway同步的数据可能有插件、规则、选择器、元数据以及鉴权信息。

public enum ConfigGroupEnum {

    
    APP_AUTH,

    
    PLUGIN,

    
    RULE,

    
    SELECTOR,

    
    meta_DATA;

    
    public static ConfigGroupEnum acquireByName(final String name) {
        return Arrays.stream(ConfigGroupEnum.values())
                .filter(e -> Objects.equals(e.name(), name))
                .findFirst().orElseThrow(() -> new ShenyuException(String.format(" this ConfigGroupEnum can not support %s", name)));
    }
}
DataEventTypeEnum

从DataEventTypeEnum 我们可以知道,以下事件可能会触发同步,删除、创建、更新、刷新以及第一次的全量同步(我们猜测MYSELF,为全量标识)

public enum DataEventTypeEnum {

    
    DELETE,

    
    CREATE,

    
    UPDATE,

    
    REFRESH,

    
    MYSELF;

    
    public static DataEventTypeEnum acquireByName(final String name) {
        return Arrays.stream(DataEventTypeEnum.values())
                .filter(e -> Objects.equals(e.name(), name))
                .findFirst()
                .orElseThrow(() -> new ShenyuException(String.format(" this DataEventTypeEnum can not support %s", name)));
    }
}
DataChangedEventDispatcher

监听者一般是实现了ApplicationListener接口的,于是我们全局搜索发现了下面这个类

@Component
public class DataChangedEventDispatcher implements ApplicationListener, InitializingBean {

    private final ApplicationContext applicationContext;

    private List listeners;

    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List) event.getSource(), event.getEventType());
                    break;
                case meta_DATA:
                    listener.onmetaDataChanged((List) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

    @Override
    public void afterPropertiesSet() {
        Collection listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }
}
配置解析

继续往下走我们发现有以下几个类

既然有这么多种实现,那admin又是怎么选择的呢?这里我们可以回想一下,在项目启动的时候,我们配置了以下同步属性

通过搜寻"shenyu.sync.websocket",我们找到了WebsocketSyncProperties

@ConfigurationProperties(prefix = "shenyu.sync.websocket")
public class WebsocketSyncProperties {

    
    private boolean enabled = true;

    
    public boolean isEnabled() {
        return enabled;
    }

    
    public void setEnabled(final boolean enabled) {
        this.enabled = enabled;
    }
}

但是该配置文件只有是否开启websocket,并没有注册WebsocketDataChangedListener等通信相关的Bean,所以我们通过WebsocketSyncProperties 继续往下找,找到了下面这个类,这正是我们想要的~

@Configuration
// 如果shenyu.sync.websocket.enabled=true,那么就会走这个配置
// 这里默认为true
@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {

    
    @Bean
    @ConditionalOnMissingBean(WebsocketDataChangedListener.class)
    public DataChangedListener websocketDataChangedListener() {
        return new WebsocketDataChangedListener();
    }

    
    @Bean
    @ConditionalOnMissingBean(WebsocketCollector.class)
    public WebsocketCollector websocketCollector() {
        return new WebsocketCollector();
    }

    
    @Bean
    @ConditionalOnMissingBean(ServerEndpointExporter.class)
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

回到listener,这里我们选择WebsocketDataChangedListener

public class WebsocketDataChangedListener implements DataChangedListener {
  @Override
  public void onSelectorChanged(final List selectorDataList, final DataEventTypeEnum eventType) {
      // 组装WebsocketData数据
      WebsocketData websocketData =
              new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
      // 通信
      WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
  }
}

我们再来看WebsocketCollector.send这个方法(WebsocketCollector正好也是前面注册过的Bean)

@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {

    private static final Logger LOG = LoggerFactory.getLogger(WebsocketCollector.class);

    private static final Set SESSION_SET = new CopyOnWriteArraySet<>();

    private static final String SESSION_KEY = "sessionKey";
    
    
    public static void send(final String message, final DataEventTypeEnum type) {
        if (StringUtils.isBlank(message)) {
            return;
        }
        // 这里侧面证实了我们的猜想,DataEventTypeEnum.MYSELF为全量同步
        // 因为这里创建了需要通信的websocket的session
        if (DataEventTypeEnum.MYSELF == type) {
            Session session = (Session) ThreadLocalUtils.get(SESSION_KEY);
            if (Objects.nonNull(session)) {
                sendMessageBySession(session, message);
            }
        } else {
            SESSION_SET.forEach(session -> sendMessageBySession(session, message));
        }

    }

    private static synchronized void sendMessageBySession(final Session session, final String message) {
        try {
            // 通过websocket的session把消息发送出去
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            LOG.error("websocket send result is exception: ", e);
        }
    }
    
    // ......
}
gateway数据同步 ShenyuWebsocketClient

有了数据的发送,那肯定得有接收,我们可以在shenyu-sync-data-websocket下面找到ShenyuWebsocketClient这个类,他继承了WebSocketClient。

public final class ShenyuWebsocketClient extends WebSocketClient {

    
    private static final Logger LOG = LoggerFactory.getLogger(ShenyuWebsocketClient.class);

    private volatile boolean alreadySync = Boolean.FALSE;

    private final WebsocketDataHandler websocketDataHandler;

    
    public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,
                                 final List metaDataSubscribers, final List authDataSubscribers) {
        super(serverUri);
        this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
    }

    @Override
    public void onOpen(final ServerHandshake serverHandshake) {、
        // 如果没有同步过,那么启动的时候会去同步一次
        if (!alreadySync) {
            // 再次验证了MYSELF为,全量同步标识
            send(DataEventTypeEnum.MYSELF.name());
            alreadySync = true;
        }
    }

    @Override
    public void onMessage(final String result) {
        handleResult(result);
    }

    @Override
    public void onClose(final int i, final String s, final boolean b) {
        this.close();
    }

    @Override
    public void onError(final Exception e) {
        this.close();
    }

    @SuppressWarnings("ALL")
    private void handleResult(final String result) {
        LOG.info("handleResult({})", result);
        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
        String eventType = websocketData.getEventType();
        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        websocketDataHandler.executor(groupEnum, json, eventType);
    }
}
WebsocketDataHandler

浏览了一下ShenyuWebsocketClient,发现websocketDataHandler.executor(groupEnum, json, eventType);为关键方法,具体实现如下

public class WebsocketDataHandler {

    private static final EnumMap ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);

    
    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
                                final List metaDataSubscribers,
                                final List authDataSubscribers) {
        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
        ENUM_MAP.put(ConfigGroupEnum.meta_DATA, new metaDataHandler(metaDataSubscribers));
    }

    
    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
        ENUM_MAP.get(type).handle(json, eventType);
    }
}

该类往内存中放入了五种handler,这五种与ConfigGroupEnum的类型一一对应。接下来调用此方法

ENUM_MAP.get(type).handle(json, eventType);
AbstractDataHandler

他是在AbstractDataHandler这个抽象方法里面做分发的

public abstract class AbstractDataHandler implements DataHandler {
  @Override
  public void handle(final String json, final String eventType) {
      List dataList = convert(json);
      if (CollectionUtils.isEmpty(dataList)) {
          return;
      }
      DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
      switch (eventTypeEnum) {
          case REFRESH:
          case MYSELF:
              doRefresh(dataList);
              break;
          case UPDATE:
          case CREATE:
              doUpdate(dataList);
              break;
          case DELETE:
              doDelete(dataList);
              break;
          default:
              break;
      }
  }
}

因为我们是新增了一个选择器,所以最终会调用SelectorDataHandler.doUpdate

@Override
protected void doUpdate(final List dataList) {
    dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);
}
CommonPluginDataSubscriber

查找onSelectorSubscribe方法,我们能找到CommonPluginDataSubscriber类

public class CommonPluginDataSubscriber implements PluginDataSubscriber {
  @Override
  public void onSelectorSubscribe(final SelectorData selectorData) {
      subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
  }
}

subscribeDataHandler具体实现

// 订阅数据处理程序
private  void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
    Optional.ofNullable(classData).ifPresent(data -> {
        // 插件执行的操作
        if (data instanceof PluginData) {
            PluginData pluginData = (PluginData) data;
            if (dataType == DataEventTypeEnum.UPDATE) {
                baseDataCache.getInstance().cachePluginData(pluginData);
                Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
            } else if (dataType == DataEventTypeEnum.DELETE) {
                baseDataCache.getInstance().removePluginData(pluginData);
                Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
            }
        } 
        // 选择器执行的操作
        else if (data instanceof SelectorData) {
            SelectorData selectorData = (SelectorData) data;
            if (dataType == DataEventTypeEnum.UPDATE) {
                // 将数据缓存到内存
                baseDataCache.getInstance().cacheSelectData(selectorData);
                // 如果每个插件还有自己的处理逻辑,那么就去处理自己的逻辑
                Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
            } else if (dataType == DataEventTypeEnum.DELETE) {
                baseDataCache.getInstance().removeSelectData(selectorData);
                Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
            }
        } 
        // 规则执行的操作
        else if (data instanceof RuleData) {
            RuleData ruleData = (RuleData) data;
            if (dataType == DataEventTypeEnum.UPDATE) {
                baseDataCache.getInstance().cacheRuleData(ruleData);
                Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
            } else if (dataType == DataEventTypeEnum.DELETE) {
                baseDataCache.getInstance().removeRuleData(ruleData);
                Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
            }
        }
    });
}
baseDataCache

调用baseDataCache.getInstance().cacheSelectData(selectorData);将数据放入baseDataCache这个单例内存中。

public final class baseDataCache {

    private static final baseDataCache INSTANCE = new baseDataCache();

    
    private static final ConcurrentMap PLUGIN_MAP = Maps.newConcurrentMap();

    
    private static final ConcurrentMap> SELECTOR_MAP = Maps.newConcurrentMap();

    
    private static final ConcurrentMap> RULE_MAP = Maps.newConcurrentMap();

    private baseDataCache() {
    }
    
    
    public static baseDataCache getInstance() {
        return INSTANCE;
    }
    
    
    public void cachePluginData(final PluginData pluginData) {
        Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.put(data.getName(), data));
    }
    
    
    public void removePluginData(final PluginData pluginData) {
        Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.remove(data.getName()));
    }
    
    
    public void removePluginDataByPluginName(final String pluginName) {
        PLUGIN_MAP.remove(pluginName);
    }
    
    
    public void cleanPluginData() {
        PLUGIN_MAP.clear();
    }
    
    
    public void cleanPluginDataSelf(final List pluginDataList) {
        pluginDataList.forEach(this::removePluginData);
    }
    
    
    public PluginData obtainPluginData(final String pluginName) {
        return PLUGIN_MAP.get(pluginName);
    }
    
    
    public void cacheSelectData(final SelectorData selectorData) {
        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
    }
    
    
    public void removeSelectData(final SelectorData selectorData) {
        Optional.ofNullable(selectorData).ifPresent(data -> {
            final List selectorDataList = SELECTOR_MAP.get(data.getPluginName());
            Optional.ofNullable(selectorDataList).ifPresent(list -> list.removeIf(e -> e.getId().equals(data.getId())));
        });
    }
    
    
    public void removeSelectDataByPluginName(final String pluginName) {
        SELECTOR_MAP.remove(pluginName);
    }
    
    
    public void cleanSelectorData() {
        SELECTOR_MAP.clear();
    }
    
    
    public void cleanSelectorDataSelf(final List selectorDataList) {
        selectorDataList.forEach(this::removeSelectData);
    }
    
    
    public List obtainSelectorData(final String pluginName) {
        return SELECTOR_MAP.get(pluginName);
    }
    
    
    public void cacheRuleData(final RuleData ruleData) {
        Optional.ofNullable(ruleData).ifPresent(this::ruleAccept);
    }
    
    
    public void removeRuleData(final RuleData ruleData) {
        Optional.ofNullable(ruleData).ifPresent(data -> {
            final List ruleDataList = RULE_MAP.get(data.getSelectorId());
            Optional.ofNullable(ruleDataList).ifPresent(list -> list.removeIf(rule -> rule.getId().equals(data.getId())));
        });
    }
    
    
    public void removeRuleDataBySelectorId(final String selectorId) {
        RULE_MAP.remove(selectorId);
    }
    
    
    public void cleanRuleData() {
        RULE_MAP.clear();
    }
    
    
    public void cleanRuleDataSelf(final List ruleDataList) {
        ruleDataList.forEach(this::removeRuleData);
    }
    
    
    public List obtainRuleData(final String selectorId) {
        return RULE_MAP.get(selectorId);
    }

    
    private void ruleAccept(final RuleData data) {
        String selectorId = data.getSelectorId();
        synchronized (RULE_MAP) {
            if (RULE_MAP.containsKey(selectorId)) {
                List existList = RULE_MAP.get(selectorId);
                final List resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
                resultList.add(data);
                final List collect = resultList.stream().sorted(Comparator.comparing(Ruledata::getSort)).collect(Collectors.toList());
                RULE_MAP.put(selectorId, collect);
            } else {
                RULE_MAP.put(selectorId, Lists.newArrayList(data));
            }
        }
    }

    
    private void selectorAccept(final SelectorData data) {
        String key = data.getPluginName();
        synchronized (SELECTOR_MAP) {
            if (SELECTOR_MAP.containsKey(key)) {
                List existList = SELECTOR_MAP.get(key);
                final List resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
                resultList.add(data);
                final List collect = resultList.stream().sorted(Comparator.comparing(Selectordata::getSort)).collect(Collectors.toList());
                SELECTOR_MAP.put(key, collect);
            } else {
                SELECTOR_MAP.put(key, Lists.newArrayList(data));
            }
        }
    }
}
配置解析

接下来,我们找到gateway的配置文件

通过分析此配置,我们可以找到以下配置类

public class WebsocketConfig {
    
    
    private String urls;

    
    public String getUrls() {
        return urls;
    }

    
    public void setUrls(final String urls) {
        this.urls = urls;
    }

    @Override
    public boolean equals(final Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        WebsocketConfig that = (WebsocketConfig) o;
        return Objects.equals(urls, that.urls);
    }

    @Override
    public int hashCode() {
        return Objects.hash(urls);
    }

    @Override
    public String toString() {
        return "WebsocketConfig{"
                + "urls='"
                + urls
                + '''
                + '}';
    }
}

通过WebsocketConfig 我们可以找到WebsocketSyncDataConfiguration,该类注册了两个Bean

@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")
public class WebsocketSyncDataConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncDataConfiguration.class);

    
    @Bean
    public SyncDataService websocketSyncDataService(final ObjectProvider websocketConfig, final ObjectProvider pluginSubscriber,
                                           final ObjectProvider> metaSubscribers, final ObjectProvider> authSubscribers) {
        LOGGER.info("you use websocket sync shenyu data.......");
        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }

    
    @Bean
    @ConfigurationProperties(prefix = "shenyu.sync.websocket")
    public WebsocketConfig websocketConfig() {
        return new WebsocketConfig();
    }

}

接下来我们观察WebsocketSyncDataService这个类

public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
    
    private static final Logger LOG = LoggerFactory.getLogger(WebsocketSyncDataService.class);
    private final List clients = new ArrayList<>();
    private final ScheduledThreadPoolExecutor executor;
    
    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                    final PluginDataSubscriber pluginDataSubscriber,
                                    final List metaDataSubscribers,
                                    final List authDataSubscribers) {
        // 以逗号分隔admin的url
        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
        // 根据url的个数创建定时任务线程池
        executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));
        // 循环遍历,为所有client设置数据
        for (String url : urls) {
            try {
                clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
            } catch (URISyntaxException e) {
                LOG.error("websocket url({}) is error", url, e);
            }
        }
        try {
            // 循环遍历,为所有client建立连接
            for (WebSocketClient client : clients) {
                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                if (success) {
                    LOG.info("websocket connection is successful.....");
                } else {
                    LOG.error("websocket connection is error.....");
                }
                // 执行定时任务,每隔10秒执行一次
                // 主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。
                // 如果没有断开,就进行 ping-pong 检测
                executor.scheduleAtFixedRate(() -> {
                    try {
                        if (client.isClosed()) {
                            boolean reconnectSuccess = client.reconnectBlocking();
                            if (reconnectSuccess) {
                                LOG.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());
                            } else {
                                LOG.error("websocket reconnection server[{}] is error.....", client.getURI().toString());
                            }
                        } else {
                            client.sendPing();
                            LOG.debug("websocket send to [{}] ping message successful", client.getURI().toString());
                        }
                    } catch (InterruptedException e) {
                        LOG.error("websocket connect is error :{}", e.getMessage());
                    }
                }, 10, 10, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            LOG.info("websocket connection...exception....", e);
        }
    }
    @Override
    public void close() {
        for (WebSocketClient client : clients) {
            if (!client.isClosed()) {
                client.close();
            }
        }
        if (Objects.nonNull(executor)) {
            executor.shutdown();
        }
    }
}
验证猜想 增量数据同步

Java Line Breakpoints

CommonPluginDataSubscriber.java:154CommonPluginDataSubscriber.java:141CommonPluginDataSubscriber.java:96SelectorDataHandler.java:50AbstractDataHandler.java:77WebsocketDataHandler.java:59ShenyuWebsocketClient.java:89WebsocketCollector.java:131WebsocketDataChangedListener.java:48DataChangedEventDispatcher.java:64SelectorServiceImpl.java:318SelectorServiceImpl.java:152SelectorController.java:92AbstractCircuitBreaker.java:123

admin




gateway




全量数据同步

Java Line Breakpoints

WebsocketCollector.java:86ShenyuWebsocketClient.java:66
启动时

gateway

admin

总结

和猜想一致

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/731418.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号