栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka的多数据源监听相关操作

kafka的多数据源监听相关操作

我们经常会遇到一个微服务里面想要对多态服务器的kafka进行监听(非集群)这时候平常在application.properties可能就没办法支撑了,我们就需要通过原始方式进行配置

@Configuration
@EnableKafka
public class ConfiguraterKafka {
	// kafka所在服务的ip:端口
    @Value("${kafkaServiceIp1}")
    private String kafkaServiceIp1;

    @Value("${kafkaServiceIp2}")
    private String kafkaServiceIp2;

	// kafka所在服务的groupId多个就配多个 我这里两个一样所以就是一个
    @Value("${kafkaGroupId}")
    private String groupId;

	// 创建kafka的工厂对象1
    @Bean
    KafkaListenerContainerFactory>
    																		 kafkaFactory1(){
        ConcurrentKafkaListenerContainerFactory factory = 
        									new ConcurrentKafkaListenerContainerFactory<>();
        // 创建消费的kafka工厂对象
        ConsumerFactory consumerFactory = new 
        	DefaultKafkaConsumerFactory(kafkaProperties(kafkaServiceIp1,groupId));
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

	// 创建kafka的工厂对象2
    @Bean
    KafkaListenerContainerFactory> 
    																		kafkaFactory2(){
        ConcurrentKafkaListenerContainerFactory factory = 
        										new ConcurrentKafkaListenerContainerFactory<>();
        ConsumerFactory consumerFactory = new 
        	DefaultKafkaConsumerFactory(kafkaProperties(kafkaServiceIp2,groupId));
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

	// kafka的配置
    private Map kafkaProperties(String kafkaServiceIp,String groupId) {
        Map map = new HashMap<>();
        // 这里就是配置kafka的相关参数 比如ip地址和分组这些参数
        Map properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceIp);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return properties;
    }
}

	// topic不用多说就是当前想要消费的topic containFactory就是刚才创建的kafka工厂类注册到ioc的名字
    @KafkaListener(topics = "testTest",containerFactory = "kafkaFactory1")
    public void kafkaConsumer1(String message){
        System.out.println("kafkaFactory1的消息是:"+message);
    }

    @KafkaListener(topics = "testTest",containerFactory = "kafkaFactory2")
    public void kafkaConsumer2(String message){
        System.out.println("kafkaFactory2的消息是:"+message);
    }
// 我往两台服务器的testTest分别塞了一条数据结果如下 都能进行消费
kafkaFactory1的消息是:这是151服务器的数据body
kafkaFactory2的消息是:这是11服务器的数据老铁

对于其他服务器里面的kafka进行发送消息

  • 下面代码都是基于上面代码的补充
// kafkaProperties 添加提供者的key和value相关序列化配置
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

// 创建kafkaTemplate
    @Bean
    KafkaTemplate myKafkaTemplate(){
        DefaultKafkaProducerFactory producerFactory = 
        		new DefaultKafkaProducerFactory<>(kafkaProperties(kafkaServiceIp2, groupId));
        return new KafkaTemplate<>(producerFactory);
    }

    @Autowired
    private KafkaTemplate myKafkaTemplate;

    @KafkaListener(topics = "testTest",containerFactory = "kafkaFactory1")
    public void kafkaConsumer1(String message){
        System.out.println("消费服务器1的数据是:"+message);

        myKafkaTemplate.send("testTest",message);
        System.out.println("提供给服务器2的数据是:"+message);
    }

    @KafkaListener(topics = "testTest",containerFactory = "kafkaFactory2")
    public void kafkaConsumer3(String message){
        System.out.println("消费服务器2的结果是:"+message);
    }
//  往151里面的testTest发送消息,之后结果如下
结果:
消费服务器1的数据是:这是来自151的数据
提供给服务器2的数据是:这是来自151的数据
消费服务器2的结果是:这是来自151的数据
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354842.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号