容器启动时,创建 类型为 topic 的 Exchange , 名为 SpringCloudBus并且创建监听 SpringCloudBus 绑定的匿名队列 spring cloud bus Server Refresh
当请求 /actuator/busrefresh/{serviceName}:** 端点时:
RefreshBusEndpoint 发布 RefreshRemoteApplicationEvent 事件接着 RemoteApplicationEventListener 接收 RefreshRemoteApplicationEvent 事件并处理,发现是给自己的, 发送给 mq
mq 接收到消息后,BuConsumer 进行处理,BusConsumer 发现并不是给自己的请求,处理结束 仅接着 RefreshListener 也接收到 RefreshRemoteApplicationEvent, 发现不是给自己的,结束处理Bus Server 刷新结束 RefreshBusEndpoint
@Endpoint(id = "busrefresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {
public RefreshBusEndpoint(ApplicationEventPublisher publisher, String id, Destination.Factory destinationFactory) {
super(publisher, id, destinationFactory);
}
@WriteOperation
public void busRefreshWithDestination(@Selector(match = Match.ALL_REMAINING) String[] destinations) {
String destination = StringUtils.arrayToDelimitedString(destinations, ":");
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), getDestination(destination)));
}
@WriteOperation
public void busRefresh() {
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), getDestination(null)));
}
}
RemoteApplicationEventListener
public class RemoteApplicationEventListener implements ApplicationListener{ private final Log log = LogFactory.getLog(getClass()); private final ServiceMatcher serviceMatcher; private final BusBridge busBridge; public RemoteApplicationEventListener(ServiceMatcher serviceMatcher, BusBridge busBridge) { this.serviceMatcher = serviceMatcher; this.busBridge = busBridge; } @Override public void onApplicationEvent(RemoteApplicationEvent event) { if (this.serviceMatcher.isFromSelf(event) && !(event instanceof AckRemoteApplicationEvent)) { if (log.isDebugEnabled()) { log.debug("Sending remote event on bus: " + event); } // TODO: configurable mimetype? this.busBridge.send(event); } }
看 serviceMatcher.isFromSelf(event) 这个方法:
public class PathServiceMatcher implements ServiceMatcher {
private final PathMatcher matcher;
//omit..
public boolean isFromSelf(RemoteApplicationEvent event) {
//originService: cloud-config-server
String originService = event.getOriginService();
// serviceId: cloud-config-server:9090
String serviceId = getBusId();
//返回true
return this.matcher.match(originService, serviceId);
}
}
可以发现, Bus Server 发现这是给自己的请求,于是将消息通过 BusBridge 发送给 消息队列
RefreshListenerRefreshLisener 也是监听了 RemoteApplicationEvent 事件,所以它也要处理:
public class RefreshListener implements ApplicationListener{ private static Log log = LogFactory.getLog(RefreshListener.class); private ContextRefresher contextRefresher; private ServiceMatcher serviceMatcher; public RefreshListener(ContextRefresher contextRefresher, ServiceMatcher serviceMatcher) { this.contextRefresher = contextRefresher; this.serviceMatcher = serviceMatcher; } @Override public void onApplicationEvent(RefreshRemoteApplicationEvent event) { log.info("Received remote refresh request."); if (serviceMatcher.isForSelf(event)) { Set keys = this.contextRefresher.refresh(); log.info("Keys refreshed " + keys); } else { log.info("Refresh not performed, the event was targeting " + event.getDestinationService()); } } }
这个类的功能主要是:
判断是否给自己消息刷新上下文
重点在 serviceMatcher.isForSelf(event) 方法:
//PathServiceMatcher.java
public boolean isForSelf(RemoteApplicationEvent event) {
//destinationService: customer:**
//getBusId(): cloud-config-server:9090
String destinationService = event.getDestinationService();
if (destinationService == null || destinationService.trim().isEmpty()
|| this.matcher.match(destinationService, getBusId())) {
return true;
}
// Check all potential config names instead of service name
for (String configName : this.configNames) {
if (this.matcher.match(destinationService, configName)) {
return true;
}
}
return false;
}
很明显,这个消息 BusServer处理不了,destinationService 是 customer,而不是 cloud-config-server
BusConsumer紧接,由 BusBridge 发出的 RemoteApplicationEvent 事件由mq 转发,就被消费者 BusConsumer 收到:
public class BusConsumer implements Consumer{ private final Log log = LogFactory.getLog(getClass()); private final ApplicationEventPublisher publisher; private final ServiceMatcher serviceMatcher; private final ObjectProvider busBridge; private final BusProperties properties; private final Destination.Factory destinationFactory; public BusConsumer(ApplicationEventPublisher publisher, ServiceMatcher serviceMatcher, ObjectProvider busBridge, BusProperties properties, Destination.Factory destinationFactory) { this.publisher = publisher; this.serviceMatcher = serviceMatcher; this.busBridge = busBridge; this.properties = properties; this.destinationFactory = destinationFactory; } @Override public void accept(RemoteApplicationEvent event) { if (event instanceof AckRemoteApplicationEvent) { if (this.properties.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event) && this.publisher != null) { this.publisher.publishEvent(event); } // If it's an ACK we are finished processing at this point return; } if (log.isDebugEnabled()) { log.debug("Received remote event from bus: " + event); } if (this.serviceMatcher.isForSelf(event) && this.publisher != null) { if (!this.serviceMatcher.isFromSelf(event)) { this.publisher.publishEvent(event); } if (this.properties.getAck().isEnabled()) { AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getBusId(), destinationFactory.getDestination( event.getDestinationService(), event.getId(), event.getClass()); this.busBridge.ifAvailable(bridge -> bridge.send(ack)); this.publisher.publishEvent(ack); } } if (this.properties.getTrace().isEnabled() && this.publisher != null) { // We are set to register sent events so publish it for local consumption, // irrespective of the origin this.publisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass())); } } }
显然:
event 不是 AckRemoteApplicationEvent 事件serviceMatcher.isForSelf(event) 依然返回 false, 因为 destinationService 是 customer, 而不是 cloud-config-server
也就是说,当Bus Server 在执行刷新的端点时, 仅仅就是向mq发送了 一个RemoteApplicationEvent 事件,当事件的 destinationService 和本身的服务名称相匹配时,就可以执行真正的动态刷新配置了
Bus Server 刷新流程结束!



