栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot消费kafka Listener method could not be invoked with the incoming message

springboot消费kafka Listener method could not be invoked with the incoming message

bug背景:使用mysql cdc kafka 处理消息,当mysql删除一条记录时会触发此bug
错误信息:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.flex.notify.listener.NotifyListener.preOrder(java.lang.String)]
Bean [com.flex.notify.listener.NotifyListener@ce6cd62]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.flex.notify.listener.NotifyListener.preOrder(java.lang.String): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]] , failedMessage=GenericMessage [payload=org.springframework.kafka.support.KafkaNull@4e24013e, headers={kafka_offset=1807, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@852319e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey={"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"id"}],"optional":false,"name":"debezium_connector.flex.pre_order.Key"},"payload":{"id":"062c68e78adfa1e5c2ef130603bad10e"}}, kafka_receivedTopic=debezium_connector.flex.pre_order, kafka_receivedTimestamp=1648020906745, kafka_groupId=notify1}]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.flex.notify.listener.NotifyListener.preOrder(java.lang.String): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]] , failedMessage=GenericMessage [payload=org.springframework.kafka.support.KafkaNull@4e24013e, headers={kafka_offset=1807, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@852319e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey={"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"id"}],"optional":false,"name":"debezium_connector.flex.pre_order.Key"},"payload":{"id":"062c68e78adfa1e5c2ef130603bad10e"}}, kafka_receivedTopic=debezium_connector.flex.pre_order, kafka_receivedTimestamp=1648020906745, kafka_groupId=notify1}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) [spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2604) [spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2565) [spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2492) [spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2402) [spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2281) [spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1955) [spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) [spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) [spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) [spring-kafka-2.8.3.jar:2.8.3]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_302]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[na:1.8.0_302]
	at java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:1.8.0_302]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_302]
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:374) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:355) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2584) [spring-kafka-2.8.3.jar:2.8.3]
Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.flex.notify.listener.NotifyListener.preOrder(java.lang.String): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]] 
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:122) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2584) [spring-kafka-2.8.3.jar:2.8.3]
	... 12 common frames omitted

解决方法:加上 @Payload(required = false)

原因:

这里发现删除一条mysql记录后 kafka会产生两个消息,第一个消息正常有内容,第二个消息为null

具体原因需要看下cdc实现,这里不做说明了

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774462.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号