嵌入式Kafka测试适用于以下配置,
测试课注释
@EnableKafka@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config@EmbeddedKafka( partitions = 1, controlledShutdown = false, brokerProperties = { "listeners=PLAINTEXT://localhost:3333", "port=3333"})public class KafkaConsumerTest { @Autowired KafkaEmbedded kafkaEmbeded; @Autowired KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;注释之前的设置方法
@Beforepublic void setUp() throws Exception { for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { ContainerTestUtils.waitForAssignment(messageListenerContainer, kafkaEmbeded.getPartitionsPerTopic()); }}注意:我不是
@ClassRule用来创建嵌入式Kafka而是自动装配
@Autowired embeddedKafka
@Testpublic void testReceive() throws Exception { kafkaTemplate.send(topic, data);}希望这可以帮助!
编辑:测试配置类标记为
@TestConfiguration
@TestConfigurationpublic class TestConfig {@Beanpublic ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setDefaultTopic(topic); return kafkaTemplate;}现在
@Test方法将自动连接KafkaTemplate并用于发送消息
kafkaTemplate.send(topic, data);
用上面的行更新了答案代码块



