栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

从数据库中获取需要监听的Kafka topic

从数据库中获取需要监听的Kafka topic

背景:Kafka监听数据库中配置topic数据,并将数据转换后进行kafka数据转发

1.application.yml

test:
  kafka:
    bootstrapServers: 192.168.11.1:9092,192.168.11.2:9092
    consumer:
      group.id: group-id
      auto.offset.reset: latest
      enable.auto.commit: true
    producer:
      max.in.flight.requests.per.connection: 1
      acks: -1
      buffer.memory: 33554432
      batch.size: 16384
      linger.ms: 100
      retries: 1

2.KafkaConfig.java

@Component
@ConfigurationProperties(prefix = "test.kafka")
@Data
@Slf4j
public class KafkaConfig implements InitializingBean {

    private String bootstrapServers;

    private Properties consumer;
    private Properties producer;

    @Override
    public void afterPropertiesSet() throws Exception {
        Properties consumerProperties = getConsumerProperties();
        Properties producerProperties = getProducerProperties();
    }

    public Properties getConsumerProperties() {
        Properties p = new Properties();
        p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 500);
        p.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 70000);
        p.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
        p.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        p.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 70000);
        p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        p.putAll(consumer);
        return p;
    }

    public Properties getProducerProperties(){
        Properties p = new Properties();
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        p.putAll(producer);
        return p;
    }
}

3.KafkaReceiver.java

@Slf4j
@Component("kafkaReceiver")
public class KafkaReceiver extends Thread {

    private KafkaConfig kafkaConfig;

    @Autowired
    public KafkaReceiver (KafkaConfig kafkaConfig) {
        this.kafkaConfig= kafkaConfig;
    }
 
    @Override
    public void run() {
        log.info("启动kafka consumer ...");
        KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getConsumerProperties());
        Set topics ; //TODO 从数据库中查询的topic
        consumer.subscribe(topics );

        KafkaProducer producer = new KafkaProducer(kafkaConfig.getProducerProperties());
        while (true) {
            try {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord record : records) {
                    String topic = record.topic();
                    String msg = record.value();
                    log.info("topic={},offset={},value={}", topic, record.offset(), msg);
                    //TODO 在这里可以进行kafka消息转换。
            
                    String targetTopic ;//转发的目的topic
                    ProducerRecord producerRecord = new ProducerRecord(targetTopic, key, msg);
                    producer.send(producerRecord, (Recordmetadata metadata, Exception exception) -> {
                        if (exception == null) {
                            log.info("send success topic={},offset={},key={}", metadata.topic(), metadata.offset(), key);
                        } else {
                            log.error("send failure key={}", key);
                        }
                    });
                }
            } catch (Exception e) {
                log.error("", e);
            }
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758088.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号