我们经常会遇到一个微服务里面想要对多态服务器的kafka进行监听(非集群)这时候平常在application.properties可能就没办法支撑了,我们就需要通过原始方式进行配置
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${brokerAddress}") // kafka所在地址ip:port
private String brokerAddress;
@Value("${groupId}") // kafka的分组Id
private String groupId;
// 这里的工厂类配置可以配置多个,想要今天多少台服务器的kafka就可以写多少个
@Bean
KafkaListenerContainerFactory> consumerFactory(){
// 创建kafka的工厂类
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
// 创建kafka的消费工厂类 这里也可以使kafka的提供者工厂类
ConsumerFactory consumerFactory =
new DefaultKafkaConsumerFactory(factoryProperties(groupId,brokerAddress));
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
// 将当前kafka的工厂对象返回装载到ioc
return factory;
}
private Map factoryProperties(String groupId, String brokerAddress) {
// 这里就是配置kafka的相关参数 比如ip地址和分组这些参数
Map properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
return properties;
}
}
// 监听类
// 这里的topic名字就是当前配置kafka里面的topic
// containerFactory就是配置的kafka工厂对象在ioc的名字
@KafkaListener(topics = "topicName",containerFactory =
"consumerFactory")
public void kafkaTest(String message){
// 对监听到的消息进行操作
...
}



