@RabbitListener(queues = "activity_queue" ,containerFactory = "simpleRabbitListenerContainerFactory")
rabbitmq 执行流程
RabbitListenerEndpointRegistrar 实现 InitializingBean 接口,启动会自动被调用
org.springframework.beans.factory.InitializingBean#afterPropertiesSet方法 @Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
注册所有的端点
protected void registerAllEndpoints() {
Assert.state(this.endpointRegistry != null, "No registry available");
synchronized (this.endpointDescriptors) {
for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
if (descriptor.endpoint instanceof MultiMethodRabbitListenerEndpoint && this.validator != null) {
((MultiMethodRabbitListenerEndpoint) descriptor.endpoint).setValidator(this.validator);
}
this.endpointRegistry.registerListenerContainer(// NOSonAR never null
descriptor.endpoint,
resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar#resolveContainerFactory
private RabbitListenerContainerFactory> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
if (descriptor.containerFactory != null) {
return descriptor.containerFactory;
}
else if (this.containerFactory != null) {
return this.containerFactory;
}
else if (this.containerFactoryBeanName != null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
// 得到注入容器中的 containerFactory
this.containerFactory = this.beanFactory.getBean(
this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
return this.containerFactory; // Consider changing this if live change of the factory is required
}
else {
throw new IllegalStateException("Could not resolve the " +
RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
descriptor.endpoint + "] no factory was given and no default is set.");
}
}
spring执行到 org.springframework.context.support.AbstractApplicationContext#finishRefresh 方法
protected void finishRefresh() {
// Clear context-level resource caches (such as ASM metadata from scanning).
clearResourceCaches();
// Initialize lifecycle processor for this context.
initLifecycleProcessor();
// 这一步
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
// Publish the final event.
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
if (!NativeDetector.inNativeImage()) {
LiveBeansView.registerApplicationContext(this);
}
}
org.springframework.context.support.DefaultLifecycleProcessor#startBeans
private void startBeans(boolean autoStartupOnly) {
Map lifecycleBeans = getLifecycleBeans();
Map phases = new TreeMap<>();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartuponly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int phase = getPhase(bean);
phases.computeIfAbsent(
phase,
p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly)
).add(beanName, bean);
}
});
if (!phases.isEmpty()) {
//执行start
phases.values().forEach(LifecycleGroup::start);
}
}
org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup#start
public void start() {
if (this.members.isEmpty()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Starting beans in phase " + this.phase);
}
Collections.sort(this.members);
for (LifecycleGroupMember member : this.members) {
doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
}
}
org.springframework.context.support.DefaultLifecycleProcessor#doStart
private void doStart(MaplifecycleBeans, String beanName, boolean autoStartupOnly) { Lifecycle bean = lifecycleBeans.remove(beanName); if (bean != null && bean != this) { String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName); for (String dependency : dependenciesForBean) { doStart(lifecycleBeans, dependency, autoStartupOnly); } if (!bean.isRunning() && (!autoStartuponly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) { if (logger.isTraceEnabled()) { logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]"); } try { // 调用start bean.start(); } catch (Throwable ex) { throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex); } if (logger.isDebugEnabled()) { logger.debug("Successfully started bean '" + beanName + "'"); } } } }
执行 RabbitListenerEndpointRegistry 的start方法
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}
org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry#startIfNecessary
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
// simpleRabbitListenerContainerFactory SimpleMessageListenerContainer
listenerContainer.start();
}
}
SimpleRabbitListenerContainer
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
// 执行父类 AbstractMessageListenerContainer 的start方法
public void start() {
if (isRunning()) {
return;
}
if (!this.initialized) {
synchronized (this.lifecycleMonitor) {
if (!this.initialized) {
afterPropertiesSet();
}
}
}
try {
logger.debug("Starting Rabbit listener container.");
configureAdminIfNeeded();
checkMismatchedQueues();
doStart();
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
finally {
this.lazyLoad = false;
}
}
}
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#checkMismatchedQueues
protected void checkMismatchedQueues() {
if (this.mismatchedQueuesFatal && this.amqpAdmin != null) {
try {
// 初始化
this.amqpAdmin.initialize();
}
catch (AmqpConnectException e) {
logger.info("Broker not available; cannot check queue declarations");
}
catch (AmqpIOException e) {
if (RabbitUtils.isMismatchedQueueArgs(e)) {
throw new FatalListenerStartupException("Mismatched queues", e);
}
else {
logger.info("Failed to get connection during start(): " + e);
}
}
}
else {
try {
Connection connection = getConnectionFactory().createConnection(); // NOSONAR
if (connection != null) {
connection.close();
}
}
catch (Exception e) {
logger.info("Broker not available; cannot force queue declarations during start: " + e.getMessage());
}
}
}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart
protected void doStart() {
Assert.state(!this.consumerBatchEnabled || getMessageListener() instanceof BatchMessageListener
|| getMessageListener() instanceof ChannelAwareBatchMessageListener,
"When setting 'consumerBatchEnabled' to true, the listener must support batching");
checkListenerContainerAware();
super.doStart();
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " +
"(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
Set processors = new HashSet();
// 关键代码
for (BlockingQueueConsumer consumer : this.consumers) {
//
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// 提交任务执行
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
waitForConsumersToStart(processors);
}
}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run
public void run() { // NOSonAR - line count
if (!isActive()) {
return;
}
boolean aborted = false;
this.consumer.setLocallyTransacted(isChannelLocallyTransacted());
String routingLookupKey = getRoutingLookupKey();
if (routingLookupKey != null) {
SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSonAR both never null
}
if (this.consumer.getQueueCount() < 1) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer stopping; no queues for " + this.consumer);
}
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
}
this.start.countDown();
return;
}
try {
initialize();
// 循环等待
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
}
catch (InterruptedException e) {
logger.debug("Consumer thread interrupted, processing stopped.");
Thread.currentThread().interrupt();
aborted = true;
publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
}
catch (QueuesNotAvailableException ex) {
logger.error("Consumer threw missing queues exception, fatal=" + isMissingQueuesFatal(), ex);
if (isMissingQueuesFatal()) {
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
}
catch (FatalListenerStartupException ex) {
logger.error("Consumer received fatal exception on startup", ex);
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishConsumerFailedEvent("Consumer received fatal exception on startup", true, ex);
}
catch (FatalListenerExecutionException ex) { // NOSonAR exception as flow control
logger.error("Consumer received fatal exception during processing", ex);
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex);
}
catch (PossibleAuthenticationFailureException ex) {
logger.error("Consumer received fatal=" + isPossibleAuthenticationFailureFatal() +
" exception during processing", ex);
if (isPossibleAuthenticationFailureFatal()) {
this.startupException =
new FatalListenerStartupException("Authentication failure",
new AmqpAuthenticationException(ex));
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex);
}
catch (ShutdownSignalException e) {
if (RabbitUtils.isNormalShutdown(e)) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());
}
}
else {
logConsumerException(e);
}
}
catch (AmqpIOException e) {
if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
&& e.getCause().getCause().getMessage().contains("in exclusive use")) {
getExclusiveConsumerExceptionLogger().log(logger,
"Exclusive consumer failure", e.getCause().getCause());
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
}
else {
logConsumerException(e);
}
}
catch (Error e) { //NOSONAR
logger.error("Consumer thread error, thread abort.", e);
publishConsumerFailedEvent("Consumer threw an Error", true, e);
getJavaLangErrorHandler().handle(e);
aborted = true;
}
catch (Throwable t) { //NOSONAR
// by now, it must be an exception
if (isActive()) {
logConsumerException(t);
}
}
finally {
if (getTransactionManager() != null) {
ConsumerChannelRegistry.unRegisterConsumerChannel();
}
}
// In all cases count down to allow container to progress beyond startup
this.start.countDown();
killOrRestart(aborted);
if (routingLookupKey != null) {
SimpleResourceHolder.unbind(getRoutingConnectionFactory()); // NOSonAR never null here
}
}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop
private void mainLoop() throws Exception { // NOSonAR Exception
try {
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
// 调整消费者数
checkAdjust(receivedOk);
}
long idleEventInterval = getIdleEventInterval();
if (idleEventInterval > 0) {
if (receivedOk) {
updateLastReceive();
}
else {
long now = System.currentTimeMillis();
long lastalertAt = SimpleMessageListenerContainer.this.lastNoMessagealert.get();
long lastReceive = getLastReceive();
if (now > lastReceive + idleEventInterval
&& now > lastalertAt + idleEventInterval
&& SimpleMessageListenerContainer.this.lastNoMessagealert
.compareAndSet(lastalertAt, now)) {
publishIdleContainerEvent(now - lastReceive);
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
}
}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#checkAdjust 调整消费者
private void checkAdjust(boolean receivedOk) {
if (receivedOk) {
if (isActive(this.consumer)) {
this.consecutiveIdles = 0;
if (this.consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
this.consecutiveMessages = 0;
}
}
}
else {
this.consecutiveMessages = 0;
if (this.consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
this.consecutiveIdles = 0;
}
}
}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#considerAddingAConsumer 新增消费者
private void considerAddingAConsumer() {
synchronized (this.consumersMonitor) {
if (this.consumers != null
&& this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
long now = System.currentTimeMillis();
if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
this.addAndStartConsumers(1);
this.lastConsumerStarted = now;
}
}
}
}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#considerStoppingAConsumer 减少消费者
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
synchronized (this.consumersMonitor) {
if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
long now = System.currentTimeMillis();
if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
consumer.basicCancel(true);
this.consumers.remove(consumer);
if (logger.isDebugEnabled()) {
logger.debug("Idle consumer terminating: " + consumer);
}
this.lastConsumerStopped = now;
}
}
}
}
自记录日志信息
try {
initialize();
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart
// 关键代码
for (BlockingQueueConsumer consumer : this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// 提交任务
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
执行任务代码逻辑
private final class AsyncMessageProcessingConsumer implements Runnable {
public void run(){
//主要代码逻辑
try {
initialize();
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
}
}
}
private void mainLoop() throws Exception { // NOSonAR Exception
try {
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
checkAdjust(receivedOk);
}
long idleEventInterval = getIdleEventInterval();
if (idleEventInterval > 0) {
if (receivedOk) {
updateLastReceive();
}
else {
long now = System.currentTimeMillis();
long lastalertAt = SimpleMessageListenerContainer.this.lastNoMessagealert.get();
long lastReceive = getLastReceive();
if (now > lastReceive + idleEventInterval
&& now > lastalertAt + idleEventInterval
&& SimpleMessageListenerContainer.this.lastNoMessagealert
.compareAndSet(lastalertAt, now)) {
publishIdleContainerEvent(now - lastReceive);
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
}
}
设置了 最大消费个数的时候,发现队列数据很多的时候,并不是直接新增到设置的最大数
private void checkAdjust(boolean receivedOk) {
// 判断是否有推送到消息
if (receivedOk) {
//当前消费者是否存活
if (isActive(this.consumer)) {
this.consecutiveIdles = 0;
// 当前持续的消息数 是否大于默认的配置数
if (this.consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
// 此时执行 新增队列逻辑操作
considerAddingAConsumer();
this.consecutiveMessages = 0;
}
}
}
else {
// 当队列没有收到消息的执行代码逻辑
this.consecutiveMessages = 0;
if (this.consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
// 移除多余的channel
considerStoppingAConsumer(this.consumer);
this.consecutiveIdles = 0;
}
}
}
新增代码逻辑
private void considerAddingAConsumer() {
synchronized (this.consumersMonitor) {
if (this.consumers != null
&& this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
long now = System.currentTimeMillis();
if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
// 新增并开启消费者
this.addAndStartConsumers(1);
this.lastConsumerStarted = now;
}
}
}
}
移除代码逻辑
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
synchronized (this.consumersMonitor) {
if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
long now = System.currentTimeMillis();
if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
consumer.basicCancel(true);
this.consumers.remove(consumer);
if (logger.isDebugEnabled()) {
logger.debug("Idle consumer terminating: " + consumer);
}
this.lastConsumerStopped = now;
}
}
}
}
org.springframework.amqp.rabbit.core.RabbitAdmin#initialize 注册
public void initialize() {
if (this.applicationContext == null) {
this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
return;
}
this.logger.debug("Initializing declarations");
Collection contextExchanges = new linkedList(
this.applicationContext.getBeansOfType(Exchange.class).values());
Collection contextQueues = new linkedList(
this.applicationContext.getBeansOfType(Queue.class).values());
Collection contextBindings = new linkedList(
this.applicationContext.getBeansOfType(Binding.class).values());
Collection customizers =
this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();
processDeclarables(contextExchanges, contextQueues, contextBindings);
final Collection exchanges = filterDeclarables(contextExchanges, customizers);
final Collection queues = filterDeclarables(contextQueues, customizers);
final Collection bindings = filterDeclarables(contextBindings, customizers);
for (Exchange exchange : exchanges) {
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
+ exchange.getName()
+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
+ "reopening the connection.");
}
}
for (Queue queue : queues) {
if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
+ queue.getName()
+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
+ queue.isExclusive() + ". "
+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
+ "alive, but all messages will be lost.");
}
}
if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
this.logger.debug("Nothing to declare");
return;
}
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
});
this.logger.debug("Declarations finished");
}



