admin数据同步
从SelectorController讲起发布事件ConfigGroupEnumDataEventTypeEnumDataChangedEventDispatcher配置解析 gateway数据同步
ShenyuWebsocketClientWebsocketDataHandlerAbstractDataHandlerCommonPluginDataSubscriberbaseDataCache配置解析 验证猜想
增量数据同步全量数据同步总结
当在后台管理系统中,数据发生了更新后,如何将更新的数据同步到网关中呢?
ShenYu支持多种同步方式,本文以WebSocket为例分析。
admin数据同步 从SelectorController讲起一般情况下像admin这种后台管理系统,启动的时候应该会全量的同步一次数据,后续如果发生修改,会增量同步数据。所以我们在admin的controller下,可以找到SelectorController的createSelector方法,从开始createSelector分析。
@PostMapping("")
public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) {
Integer createCount = selectorService.createOrUpdate(selectorDTO);
return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);
}
具体实现如下
@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {
// 负责事件发布的eventPublisher
private final ApplicationEventPublisher eventPublisher;
@Override
@Transactional(rollbackFor = Exception.class)
public int createOrUpdate(final SelectorDTO selectorDTO) {
int selectorCount;
// 构建数据 DTO --> DO
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
List selectorConditionDTOs = selectorDTO.getSelectorConditions();
// 判断是添加还是更新
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 插入选择器数据
selectorCount = selectorMapper.insertSelective(selectorDO);
// 插入选择器中的条件数据
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
// check selector add
// 权限检查
if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {
DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();
dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());
dataPermissionDTO.setDataId(selectorDO.getId());
dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);
dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));
}
} else {
// 更新数据,先删除再新增
selectorCount = selectorMapper.updateSelective(selectorDO);
//delete rule condition then add
selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
selectorConditionMapper.insertSelective(selectorConditionDO);
});
}
// 发布事件
publishEvent(selectorDO, selectorConditionDTOs);
// 更新upstream
updateDivideUpstream(selectorDO);
return selectorCount;
}
// ......
}
浏览以上代码,我们猜测和同步有关系的方法,如下:
发布事件
发布事件publishEvent(selectorDO, selectorConditionDTOs);
private void publishEvent(final SelectorDO selectorDO, final ListselectorConditionDTOs) { PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId()); List conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList()); // publish change event. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList)))); }
eventPublisher的对象为org.springframework.context.ApplicationEventPublisher。那么他一定是通过Spring的事件发布机制实现的。
Spring的事件发布机制,需要有发布者、监听者、以及事件。目前我们已经找到了发布者,从上面源码中我们可以知道,事件为DataChangedEvent。
public class DataChangedEvent extends ApplicationEvent {
private final DataEventTypeEnum eventType;
private final ConfigGroupEnum groupKey;
public DataChangedEvent(final ConfigGroupEnum groupKey, final DataEventTypeEnum type, final List> source) {
super(source.stream().filter(Objects::nonNull).collect(Collectors.toList()));
this.eventType = type;
this.groupKey = groupKey;
}
}
ConfigGroupEnum
从ConfigGroupEnum中我们可以知道,admin需要与gateway同步的数据可能有插件、规则、选择器、元数据以及鉴权信息。
public enum ConfigGroupEnum {
APP_AUTH,
PLUGIN,
RULE,
SELECTOR,
meta_DATA;
public static ConfigGroupEnum acquireByName(final String name) {
return Arrays.stream(ConfigGroupEnum.values())
.filter(e -> Objects.equals(e.name(), name))
.findFirst().orElseThrow(() -> new ShenyuException(String.format(" this ConfigGroupEnum can not support %s", name)));
}
}
DataEventTypeEnum
从DataEventTypeEnum 我们可以知道,以下事件可能会触发同步,删除、创建、更新、刷新以及第一次的全量同步(我们猜测MYSELF,为全量标识)
public enum DataEventTypeEnum {
DELETE,
CREATE,
UPDATE,
REFRESH,
MYSELF;
public static DataEventTypeEnum acquireByName(final String name) {
return Arrays.stream(DataEventTypeEnum.values())
.filter(e -> Objects.equals(e.name(), name))
.findFirst()
.orElseThrow(() -> new ShenyuException(String.format(" this DataEventTypeEnum can not support %s", name)));
}
}
DataChangedEventDispatcher
监听者一般是实现了ApplicationListener接口的,于是我们全局搜索发现了下面这个类
@Component public class DataChangedEventDispatcher implements ApplicationListener配置解析, InitializingBean { private final ApplicationContext applicationContext; private List listeners; public DataChangedEventDispatcher(final ApplicationContext applicationContext) { this.applicationContext = applicationContext; } @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List ) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List ) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List ) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List ) event.getSource(), event.getEventType()); break; case meta_DATA: listener.onmetaDataChanged((List ) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } } @Override public void afterPropertiesSet() { Collection listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values(); this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans)); } }
继续往下走我们发现有以下几个类
既然有这么多种实现,那admin又是怎么选择的呢?这里我们可以回想一下,在项目启动的时候,我们配置了以下同步属性
通过搜寻"shenyu.sync.websocket",我们找到了WebsocketSyncProperties
@ConfigurationProperties(prefix = "shenyu.sync.websocket")
public class WebsocketSyncProperties {
private boolean enabled = true;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(final boolean enabled) {
this.enabled = enabled;
}
}
但是该配置文件只有是否开启websocket,并没有注册WebsocketDataChangedListener等通信相关的Bean,所以我们通过WebsocketSyncProperties 继续往下找,找到了下面这个类,这正是我们想要的~
@Configuration
// 如果shenyu.sync.websocket.enabled=true,那么就会走这个配置
// 这里默认为true
@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {
@Bean
@ConditionalOnMissingBean(WebsocketDataChangedListener.class)
public DataChangedListener websocketDataChangedListener() {
return new WebsocketDataChangedListener();
}
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
return new WebsocketCollector();
}
@Bean
@ConditionalOnMissingBean(ServerEndpointExporter.class)
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
回到listener,这里我们选择WebsocketDataChangedListener
public class WebsocketDataChangedListener implements DataChangedListener {
@Override
public void onSelectorChanged(final List selectorDataList, final DataEventTypeEnum eventType) {
// 组装WebsocketData数据
WebsocketData websocketData =
new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
// 通信
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
}
我们再来看WebsocketCollector.send这个方法(WebsocketCollector正好也是前面注册过的Bean)
@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {
private static final Logger LOG = LoggerFactory.getLogger(WebsocketCollector.class);
private static final Set SESSION_SET = new CopyOnWriteArraySet<>();
private static final String SESSION_KEY = "sessionKey";
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isBlank(message)) {
return;
}
// 这里侧面证实了我们的猜想,DataEventTypeEnum.MYSELF为全量同步
// 因为这里创建了需要通信的websocket的session
if (DataEventTypeEnum.MYSELF == type) {
Session session = (Session) ThreadLocalUtils.get(SESSION_KEY);
if (Objects.nonNull(session)) {
sendMessageBySession(session, message);
}
} else {
SESSION_SET.forEach(session -> sendMessageBySession(session, message));
}
}
private static synchronized void sendMessageBySession(final Session session, final String message) {
try {
// 通过websocket的session把消息发送出去
session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOG.error("websocket send result is exception: ", e);
}
}
// ......
}
gateway数据同步
ShenyuWebsocketClient
有了数据的发送,那肯定得有接收,我们可以在shenyu-sync-data-websocket下面找到ShenyuWebsocketClient这个类,他继承了WebSocketClient。
public final class ShenyuWebsocketClient extends WebSocketClient {
private static final Logger LOG = LoggerFactory.getLogger(ShenyuWebsocketClient.class);
private volatile boolean alreadySync = Boolean.FALSE;
private final WebsocketDataHandler websocketDataHandler;
public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,
final List metaDataSubscribers, final List authDataSubscribers) {
super(serverUri);
this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
}
@Override
public void onOpen(final ServerHandshake serverHandshake) {、
// 如果没有同步过,那么启动的时候会去同步一次
if (!alreadySync) {
// 再次验证了MYSELF为,全量同步标识
send(DataEventTypeEnum.MYSELF.name());
alreadySync = true;
}
}
@Override
public void onMessage(final String result) {
handleResult(result);
}
@Override
public void onClose(final int i, final String s, final boolean b) {
this.close();
}
@Override
public void onError(final Exception e) {
this.close();
}
@SuppressWarnings("ALL")
private void handleResult(final String result) {
LOG.info("handleResult({})", result);
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());
websocketDataHandler.executor(groupEnum, json, eventType);
}
}
WebsocketDataHandler
浏览了一下ShenyuWebsocketClient,发现websocketDataHandler.executor(groupEnum, json, eventType);为关键方法,具体实现如下
public class WebsocketDataHandler {
private static final EnumMap ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
final List metaDataSubscribers,
final List authDataSubscribers) {
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.meta_DATA, new metaDataHandler(metaDataSubscribers));
}
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
ENUM_MAP.get(type).handle(json, eventType);
}
}
该类往内存中放入了五种handler,这五种与ConfigGroupEnum的类型一一对应。接下来调用此方法
ENUM_MAP.get(type).handle(json, eventType);AbstractDataHandler
他是在AbstractDataHandler这个抽象方法里面做分发的
public abstract class AbstractDataHandlerimplements DataHandler { @Override public void handle(final String json, final String eventType) { List dataList = convert(json); if (CollectionUtils.isEmpty(dataList)) { return; } DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType); switch (eventTypeEnum) { case REFRESH: case MYSELF: doRefresh(dataList); break; case UPDATE: case CREATE: doUpdate(dataList); break; case DELETE: doDelete(dataList); break; default: break; } } }
因为我们是新增了一个选择器,所以最终会调用SelectorDataHandler.doUpdate
@Override protected void doUpdate(final ListCommonPluginDataSubscriberdataList) { dataList.forEach(pluginDataSubscriber::onSelectorSubscribe); }
查找onSelectorSubscribe方法,我们能找到CommonPluginDataSubscriber类
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
@Override
public void onSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}
}
subscribeDataHandler具体实现
// 订阅数据处理程序 privatebaseDataCachevoid subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { // 插件执行的操作 if (data instanceof PluginData) { PluginData pluginData = (PluginData) data; if (dataType == DataEventTypeEnum.UPDATE) { baseDataCache.getInstance().cachePluginData(pluginData); Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData)); } else if (dataType == DataEventTypeEnum.DELETE) { baseDataCache.getInstance().removePluginData(pluginData); Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData)); } } // 选择器执行的操作 else if (data instanceof SelectorData) { SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { // 将数据缓存到内存 baseDataCache.getInstance().cacheSelectData(selectorData); // 如果每个插件还有自己的处理逻辑,那么就去处理自己的逻辑 Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData)); } else if (dataType == DataEventTypeEnum.DELETE) { baseDataCache.getInstance().removeSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); } } // 规则执行的操作 else if (data instanceof RuleData) { RuleData ruleData = (RuleData) data; if (dataType == DataEventTypeEnum.UPDATE) { baseDataCache.getInstance().cacheRuleData(ruleData); Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData)); } else if (dataType == DataEventTypeEnum.DELETE) { baseDataCache.getInstance().removeRuleData(ruleData); Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData)); } } }); }
调用baseDataCache.getInstance().cacheSelectData(selectorData);将数据放入baseDataCache这个单例内存中。
public final class baseDataCache {
private static final baseDataCache INSTANCE = new baseDataCache();
private static final ConcurrentMap PLUGIN_MAP = Maps.newConcurrentMap();
private static final ConcurrentMap> SELECTOR_MAP = Maps.newConcurrentMap();
private static final ConcurrentMap> RULE_MAP = Maps.newConcurrentMap();
private baseDataCache() {
}
public static baseDataCache getInstance() {
return INSTANCE;
}
public void cachePluginData(final PluginData pluginData) {
Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.put(data.getName(), data));
}
public void removePluginData(final PluginData pluginData) {
Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.remove(data.getName()));
}
public void removePluginDataByPluginName(final String pluginName) {
PLUGIN_MAP.remove(pluginName);
}
public void cleanPluginData() {
PLUGIN_MAP.clear();
}
public void cleanPluginDataSelf(final List pluginDataList) {
pluginDataList.forEach(this::removePluginData);
}
public PluginData obtainPluginData(final String pluginName) {
return PLUGIN_MAP.get(pluginName);
}
public void cacheSelectData(final SelectorData selectorData) {
Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
}
public void removeSelectData(final SelectorData selectorData) {
Optional.ofNullable(selectorData).ifPresent(data -> {
final List selectorDataList = SELECTOR_MAP.get(data.getPluginName());
Optional.ofNullable(selectorDataList).ifPresent(list -> list.removeIf(e -> e.getId().equals(data.getId())));
});
}
public void removeSelectDataByPluginName(final String pluginName) {
SELECTOR_MAP.remove(pluginName);
}
public void cleanSelectorData() {
SELECTOR_MAP.clear();
}
public void cleanSelectorDataSelf(final List selectorDataList) {
selectorDataList.forEach(this::removeSelectData);
}
public List obtainSelectorData(final String pluginName) {
return SELECTOR_MAP.get(pluginName);
}
public void cacheRuleData(final RuleData ruleData) {
Optional.ofNullable(ruleData).ifPresent(this::ruleAccept);
}
public void removeRuleData(final RuleData ruleData) {
Optional.ofNullable(ruleData).ifPresent(data -> {
final List ruleDataList = RULE_MAP.get(data.getSelectorId());
Optional.ofNullable(ruleDataList).ifPresent(list -> list.removeIf(rule -> rule.getId().equals(data.getId())));
});
}
public void removeRuleDataBySelectorId(final String selectorId) {
RULE_MAP.remove(selectorId);
}
public void cleanRuleData() {
RULE_MAP.clear();
}
public void cleanRuleDataSelf(final List ruleDataList) {
ruleDataList.forEach(this::removeRuleData);
}
public List obtainRuleData(final String selectorId) {
return RULE_MAP.get(selectorId);
}
private void ruleAccept(final RuleData data) {
String selectorId = data.getSelectorId();
synchronized (RULE_MAP) {
if (RULE_MAP.containsKey(selectorId)) {
List existList = RULE_MAP.get(selectorId);
final List resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
resultList.add(data);
final List collect = resultList.stream().sorted(Comparator.comparing(Ruledata::getSort)).collect(Collectors.toList());
RULE_MAP.put(selectorId, collect);
} else {
RULE_MAP.put(selectorId, Lists.newArrayList(data));
}
}
}
private void selectorAccept(final SelectorData data) {
String key = data.getPluginName();
synchronized (SELECTOR_MAP) {
if (SELECTOR_MAP.containsKey(key)) {
List existList = SELECTOR_MAP.get(key);
final List resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
resultList.add(data);
final List collect = resultList.stream().sorted(Comparator.comparing(Selectordata::getSort)).collect(Collectors.toList());
SELECTOR_MAP.put(key, collect);
} else {
SELECTOR_MAP.put(key, Lists.newArrayList(data));
}
}
}
}
配置解析
接下来,我们找到gateway的配置文件
通过分析此配置,我们可以找到以下配置类
public class WebsocketConfig {
private String urls;
public String getUrls() {
return urls;
}
public void setUrls(final String urls) {
this.urls = urls;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WebsocketConfig that = (WebsocketConfig) o;
return Objects.equals(urls, that.urls);
}
@Override
public int hashCode() {
return Objects.hash(urls);
}
@Override
public String toString() {
return "WebsocketConfig{"
+ "urls='"
+ urls
+ '''
+ '}';
}
}
通过WebsocketConfig 我们可以找到WebsocketSyncDataConfiguration,该类注册了两个Bean
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")
public class WebsocketSyncDataConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncDataConfiguration.class);
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider websocketConfig, final ObjectProvider pluginSubscriber,
final ObjectProvider> metaSubscribers, final ObjectProvider> authSubscribers) {
LOGGER.info("you use websocket sync shenyu data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
@Bean
@ConfigurationProperties(prefix = "shenyu.sync.websocket")
public WebsocketConfig websocketConfig() {
return new WebsocketConfig();
}
}
接下来我们观察WebsocketSyncDataService这个类
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(WebsocketSyncDataService.class);
private final List clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor;
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List metaDataSubscribers,
final List authDataSubscribers) {
// 以逗号分隔admin的url
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
// 根据url的个数创建定时任务线程池
executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));
// 循环遍历,为所有client设置数据
for (String url : urls) {
try {
clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
LOG.error("websocket url({}) is error", url, e);
}
}
try {
// 循环遍历,为所有client建立连接
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
LOG.info("websocket connection is successful.....");
} else {
LOG.error("websocket connection is error.....");
}
// 执行定时任务,每隔10秒执行一次
// 主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。
// 如果没有断开,就进行 ping-pong 检测
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
LOG.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());
} else {
LOG.error("websocket reconnection server[{}] is error.....", client.getURI().toString());
}
} else {
client.sendPing();
LOG.debug("websocket send to [{}] ping message successful", client.getURI().toString());
}
} catch (InterruptedException e) {
LOG.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 10, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
LOG.info("websocket connection...exception....", e);
}
}
@Override
public void close() {
for (WebSocketClient client : clients) {
if (!client.isClosed()) {
client.close();
}
}
if (Objects.nonNull(executor)) {
executor.shutdown();
}
}
}
验证猜想
增量数据同步
Java Line Breakpoints
CommonPluginDataSubscriber.java:154CommonPluginDataSubscriber.java:141CommonPluginDataSubscriber.java:96SelectorDataHandler.java:50AbstractDataHandler.java:77WebsocketDataHandler.java:59ShenyuWebsocketClient.java:89WebsocketCollector.java:131WebsocketDataChangedListener.java:48DataChangedEventDispatcher.java:64SelectorServiceImpl.java:318SelectorServiceImpl.java:152SelectorController.java:92AbstractCircuitBreaker.java:123
admin
gateway
Java Line Breakpoints
WebsocketCollector.java:86ShenyuWebsocketClient.java:66
启动时
gateway
admin
和猜想一致



