栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

spring cloud bus Server Refresh 过程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

spring cloud bus Server Refresh 过程

spring cloud bus Server Refresh 过程 Bus Server 启动过程

容器启动时,创建 类型为 topic 的 Exchange , 名为 SpringCloudBus并且创建监听 SpringCloudBus 绑定的匿名队列 spring cloud bus Server Refresh

当请求 /actuator/busrefresh/{serviceName}:** 端点时:

RefreshBusEndpoint 发布 RefreshRemoteApplicationEvent 事件接着 RemoteApplicationEventListener 接收 RefreshRemoteApplicationEvent 事件并处理,发现是给自己的, 发送给 mq

mq 接收到消息后,BuConsumer 进行处理,BusConsumer 发现并不是给自己的请求,处理结束 仅接着 RefreshListener 也接收到 RefreshRemoteApplicationEvent, 发现不是给自己的,结束处理Bus Server 刷新结束 RefreshBusEndpoint

@Endpoint(id = "busrefresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {

	public RefreshBusEndpoint(ApplicationEventPublisher publisher, String id, Destination.Factory destinationFactory) {
		super(publisher, id, destinationFactory);
	}

	@WriteOperation
	public void busRefreshWithDestination(@Selector(match = Match.ALL_REMAINING) String[] destinations) {
		String destination = StringUtils.arrayToDelimitedString(destinations, ":");
		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), getDestination(destination)));
	}

	@WriteOperation
	public void busRefresh() {
		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), getDestination(null)));
	}

}

RemoteApplicationEventListener
public class RemoteApplicationEventListener implements ApplicationListener {

    private final Log log = LogFactory.getLog(getClass());

    private final ServiceMatcher serviceMatcher;

    private final BusBridge busBridge;

    public RemoteApplicationEventListener(ServiceMatcher serviceMatcher, BusBridge busBridge) {
        this.serviceMatcher = serviceMatcher;
        this.busBridge = busBridge;
    }

    @Override
    public void onApplicationEvent(RemoteApplicationEvent event) {
        if (this.serviceMatcher.isFromSelf(event) && !(event instanceof AckRemoteApplicationEvent)) {
            if (log.isDebugEnabled()) {
                log.debug("Sending remote event on bus: " + event);
            }
            // TODO: configurable mimetype?
            this.busBridge.send(event);
        }
    }

看 serviceMatcher.isFromSelf(event) 这个方法:

public class PathServiceMatcher implements ServiceMatcher {

    private final PathMatcher matcher;

    //omit..

    public boolean isFromSelf(RemoteApplicationEvent event) {
        //originService: cloud-config-server
        String originService = event.getOriginService();
        // serviceId: cloud-config-server:9090
        String serviceId = getBusId();
        //返回true
        return this.matcher.match(originService, serviceId);
    }

}

可以发现, Bus Server 发现这是给自己的请求,于是将消息通过 BusBridge 发送给 消息队列

RefreshListener

RefreshLisener 也是监听了 RemoteApplicationEvent 事件,所以它也要处理:

public class RefreshListener implements ApplicationListener {

    private static Log log = LogFactory.getLog(RefreshListener.class);

    private ContextRefresher contextRefresher;

    private ServiceMatcher serviceMatcher;

    public RefreshListener(ContextRefresher contextRefresher, ServiceMatcher serviceMatcher) {
        this.contextRefresher = contextRefresher;
        this.serviceMatcher = serviceMatcher;
    }

    @Override
    public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
        log.info("Received remote refresh request.");
        if (serviceMatcher.isForSelf(event)) {
            Set keys = this.contextRefresher.refresh();
            log.info("Keys refreshed " + keys);
        }
        else {
            log.info("Refresh not performed, the event was targeting " + event.getDestinationService());
        }
    }

}

这个类的功能主要是:

判断是否给自己消息刷新上下文

重点在 serviceMatcher.isForSelf(event) 方法:

//PathServiceMatcher.java
public boolean isForSelf(RemoteApplicationEvent event) {
    //destinationService: customer:**
    //getBusId(): cloud-config-server:9090
    String destinationService = event.getDestinationService();
    if (destinationService == null || destinationService.trim().isEmpty()
        || this.matcher.match(destinationService, getBusId())) {
        return true;
    }

    // Check all potential config names instead of service name
    for (String configName : this.configNames) {
        if (this.matcher.match(destinationService, configName)) {
            return true;
        }
    }

    return false;
}

很明显,这个消息 BusServer处理不了,destinationService 是 customer,而不是 cloud-config-server

BusConsumer

紧接,由 BusBridge 发出的 RemoteApplicationEvent 事件由mq 转发,就被消费者 BusConsumer 收到:

public class BusConsumer implements Consumer {

    private final Log log = LogFactory.getLog(getClass());

    private final ApplicationEventPublisher publisher;

    private final ServiceMatcher serviceMatcher;

    private final ObjectProvider busBridge;

    private final BusProperties properties;

    private final Destination.Factory destinationFactory;

    public BusConsumer(ApplicationEventPublisher publisher, ServiceMatcher serviceMatcher,
                       ObjectProvider busBridge, 
                       BusProperties properties, Destination.Factory destinationFactory) {
        this.publisher = publisher;
        this.serviceMatcher = serviceMatcher;
        this.busBridge = busBridge;
        this.properties = properties;
        this.destinationFactory = destinationFactory;
    }

    @Override
    public void accept(RemoteApplicationEvent event) {
        if (event instanceof AckRemoteApplicationEvent) {
            if (this.properties.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
                && this.publisher != null) {
                this.publisher.publishEvent(event);
            }
            // If it's an ACK we are finished processing at this point
            return;
        }

        if (log.isDebugEnabled()) {
            log.debug("Received remote event from bus: " + event);
        }

        if (this.serviceMatcher.isForSelf(event) && this.publisher != null) {
            if (!this.serviceMatcher.isFromSelf(event)) {
                this.publisher.publishEvent(event);
            }
            if (this.properties.getAck().isEnabled()) {
                AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getBusId(),                           
                destinationFactory.getDestination(
                		event.getDestinationService(), event.getId(), event.getClass());
                this.busBridge.ifAvailable(bridge -> bridge.send(ack));
                this.publisher.publishEvent(ack);
            }
        }
        if (this.properties.getTrace().isEnabled() && this.publisher != null) {
            // We are set to register sent events so publish it for local consumption,
            // irrespective of the origin
            this.publisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(),
            event.getDestinationService(), event.getId(), event.getClass()));
        }
    }

}

显然:

event 不是 AckRemoteApplicationEvent 事件serviceMatcher.isForSelf(event) 依然返回 false, 因为 destinationService 是 customer, 而不是 cloud-config-server

也就是说,当Bus Server 在执行刷新的端点时, 仅仅就是向mq发送了 一个RemoteApplicationEvent 事件,当事件的 destinationService 和本身的服务名称相匹配时,就可以执行真正的动态刷新配置了

Bus Server 刷新流程结束!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774849.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号