我们经常会遇到一个微服务里面想要对多态服务器的kafka进行监听(非集群)这时候平常在application.properties可能就没办法支撑了,我们就需要通过原始方式进行配置
@Configuration
@EnableKafka
public class ConfiguraterKafka {
// kafka所在服务的ip:端口
@Value("${kafkaServiceIp1}")
private String kafkaServiceIp1;
@Value("${kafkaServiceIp2}")
private String kafkaServiceIp2;
// kafka所在服务的groupId多个就配多个 我这里两个一样所以就是一个
@Value("${kafkaGroupId}")
private String groupId;
// 创建kafka的工厂对象1
@Bean
KafkaListenerContainerFactory>
kafkaFactory1(){
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
// 创建消费的kafka工厂对象
ConsumerFactory consumerFactory = new
DefaultKafkaConsumerFactory(kafkaProperties(kafkaServiceIp1,groupId));
factory.setConsumerFactory(consumerFactory);
return factory;
}
// 创建kafka的工厂对象2
@Bean
KafkaListenerContainerFactory>
kafkaFactory2(){
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
ConsumerFactory consumerFactory = new
DefaultKafkaConsumerFactory(kafkaProperties(kafkaServiceIp2,groupId));
factory.setConsumerFactory(consumerFactory);
return factory;
}
// kafka的配置
private Map kafkaProperties(String kafkaServiceIp,String groupId) {
Map map = new HashMap<>();
// 这里就是配置kafka的相关参数 比如ip地址和分组这些参数
Map properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceIp);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return properties;
}
}
// topic不用多说就是当前想要消费的topic containFactory就是刚才创建的kafka工厂类注册到ioc的名字
@KafkaListener(topics = "testTest",containerFactory = "kafkaFactory1")
public void kafkaConsumer1(String message){
System.out.println("kafkaFactory1的消息是:"+message);
}
@KafkaListener(topics = "testTest",containerFactory = "kafkaFactory2")
public void kafkaConsumer2(String message){
System.out.println("kafkaFactory2的消息是:"+message);
}
// 我往两台服务器的testTest分别塞了一条数据结果如下 都能进行消费
kafkaFactory1的消息是:这是151服务器的数据body
kafkaFactory2的消息是:这是11服务器的数据老铁
对于其他服务器里面的kafka进行发送消息
- 下面代码都是基于上面代码的补充
// kafkaProperties 添加提供者的key和value相关序列化配置
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 创建kafkaTemplate
@Bean
KafkaTemplate myKafkaTemplate(){
DefaultKafkaProducerFactory producerFactory =
new DefaultKafkaProducerFactory<>(kafkaProperties(kafkaServiceIp2, groupId));
return new KafkaTemplate<>(producerFactory);
}
@Autowired
private KafkaTemplate myKafkaTemplate;
@KafkaListener(topics = "testTest",containerFactory = "kafkaFactory1")
public void kafkaConsumer1(String message){
System.out.println("消费服务器1的数据是:"+message);
myKafkaTemplate.send("testTest",message);
System.out.println("提供给服务器2的数据是:"+message);
}
@KafkaListener(topics = "testTest",containerFactory = "kafkaFactory2")
public void kafkaConsumer3(String message){
System.out.println("消费服务器2的结果是:"+message);
}
// 往151里面的testTest发送消息,之后结果如下
结果:
消费服务器1的数据是:这是来自151的数据
提供给服务器2的数据是:这是来自151的数据
消费服务器2的结果是:这是来自151的数据



