栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka从入门到学废(七)——Spring框架下认证集群消费消息(动态配置)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka从入门到学废(七)——Spring框架下认证集群消费消息(动态配置)

之前写了认证集群搭建和认证集群模式下生产消息的内容。本次将介绍认证集群模式下的消费消息。写博文的时候刚好项目涉及到同时要监听多个topic,而且不同topic有对应不同的groupid以及用户名密码的情况。所以本次实现了动态配置多组监听的场景,同理其它配置也可以修改配置多套。Talk is cheap

配置内容:

kafka:
  consumer:
    # topic::groupId::账号::密码,英文逗号分割
    cp: topic1::groupid1::user1::pwd1,topic2::groupid2::user2::pwd2
    # 账号密码
    security:
      # 加密协议(和认证集群搭建时候的配置内容对应)
      protocol: SASL_PLAINTEXT
      # 加密方式(和认证集群搭建时候的配置内容对应)
      sasl-mechanism: SCRAM-SHA-256
    bootstrap-servers: IP1:PORT1,IP2:PORT2,IP3:PORT3
    # 提交offset频率
    auto-commit-interval-ms: 1000
    # 偏移量失效后latest-从最新的消息读 earliest-从最早的消息开始读
    auto-offset-reset: earliest
    # 是否开启批量获取消息(区分是batch还是single--自定义属性)
    batch-listener: true
    # 并发数
    concurrency: 5
    # 是否自动提交offset(告知kafka当前consumer group读取到的消息位置)
    enable-auto-commit: true
    # 最大的等待获取消息时间(最多多久就要获取一次)与fetch-min-bytes满足一个条件,broker就会发送消息给consumer
    fetch-max-wait-ms: 1000
    # 最小获取消息字节数
    fetch-min-bytes: 1048576
    # 一次获取最大记录数
    max-poll-records: 10000
    # 获取记录超时时间
    max-poll-time-out: 30000
    # 轮询策略org.apache.kafka.clients.consumer.RangeAssignor 和 org.apache.kafka.clients.consumer.RoundRobinAssignor
    partition-assignment-strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
    session-time-out-ms: 30000

配置代码:(写的仓促,这个类可以用@ConfigurationProperties改造一下)

@Configuration
@EnableKafka
@Slf4j
public class KafkaConsumerConfig {
    private static final Integer CONSUMER_CONFIGS_COUNT = 15;
    @Value("${kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;

    @Value("#{'${kafka.consumer.cp}'.split(',')}")
    private List cpList;

    @Value("${kafka.consumer.enable-auto-commit}")
    private boolean autoCommit;

    @Value("${kafka.consumer.auto-commit-interval-ms}")
    private Integer autoCommitIntervalMs;

    @Value("${kafka.consumer.concurrency}")
    private Integer concurrency;

    @Value("${kafka.consumer.batch-listener}")
    private boolean batchListener;

    @Value("${kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${kafka.consumer.fetch-min-bytes}")
    private Integer fetchMinBytes;

    @Value("${kafka.consumer.fetch-max-wait-ms}")
    private Integer fetchMaxWaitMs;

    @Value("${kafka.consumer.max-poll-time-out}")
    private Integer maxPollTimeOut;

    @Value("${kafka.consumer.session-time-out-ms}")
    private Integer sessionTimeOutMs;

    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${kafka.consumer.partition-assignment-strategy}")
    private String partitionAssignor;

    @Value("${kafka.consumer.security.protocol}")
    private String protocol;

    @Value("${kafka.consumer.security.sasl-mechanism}")
    private String mechanism;

    @Autowired
    private KafkaReceiverBatch kafkaReceiverBatch;

    @Autowired
    private KafkaReceiverSingle kafkaReceiverSingle;

    private static final String P_NAME = "password";

    public KafkaConsumerConfig() { log.info("Kafka消费者配置加载..."); }

    @Bean
    public String KafkaConsumerFactory() {
        if (CollectionUtils.isEmpty(cpList)) {
            return null;
        }

        for (String batchProp : cpList) {
            // 配置之间用::分隔,分别是topic,group,name,pwd
            String[] batchArray = batchProp.split("::");
            ContainerProperties containerProperties = new ContainerProperties(batchArray[0]);
            if (batchListener) {
                containerProperties.setMessageListener(
                    (BatchMessageListener)data -> kafkaReceiverBatch.consumer(data));
            } else {
                containerProperties
                    .setMessageListener((MessageListener)data -> kafkaReceiverSingle.consumer(data));
            }
            ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(
                consumerFactory(batchArray[1], batchArray[2], batchArray[3]), containerProperties);
            container.setConcurrency(concurrency);
            container.start();
        }

        return "Kafka消费者加载完成";
    }

    private ConsumerFactory consumerFactory(String groupId, String userName, String pwd) {
        return new DefaultKafkaConsumerFactory(consumerProperties(groupId, userName, pwd));
    }

    private Map consumerProperties(String groupId, String userName, String pwd) {
        Map props = new HashMap<>(CONSUMER_CONFIGS_COUNT);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinBytes);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWaitMs);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollTimeOut);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOutMs);
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partitionAssignor);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            org.apache.kafka.common.serialization.StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            org.apache.kafka.common.serialization.StringDeserializer.class);

        // 认证
        props.put("security.protocol", protocol);
        props.put("sasl.mechanism", mechanism);
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=""
            + userName + "" " + P_NAME + "="" + pwd + "";");
        return props;
    }
}

批量消息处理类[KafkaReceiverBatch]

public class KafkaReceiverBatch {
    public void consumer(List> consumerRecords) {
        try {
            Map> topic2Messages = new HashMap<>(consumerRecords.size());
            for (ConsumerRecord consumerRecord : consumerRecords) {
                String topic = consumerRecord.topic();
                List messages = topic2Messages.get(topic);
                if (messages == null || messages.isEmpty()) {
                    messages = new ArrayList<>();
                }
                Optional kafkaMessage = Optional.ofNullable(consumerRecord.value());
                if (kafkaMessage.isPresent()) {
                    Object message = consumerRecord.value();
                    messages.add(message);
                    topic2Messages.put(topic, messages);
                }
            }
			// TODO 处理消息
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 

单个消息处理类[KafkaReceiverSingle]

public class KafkaReceiverSingle {
    public void consumer(ConsumerRecord consumerRecord) {
        try {
            Optional kafkaMessage = Optional.ofNullable(consumerRecord.value());
            if (kafkaMessage.isPresent()) {
                Object message = consumerRecord.value();
                // TODO 处理消息
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号