之前写了认证集群搭建和认证集群模式下生产消息的内容。本次将介绍认证集群模式下的消费消息。写博文的时候刚好项目涉及到同时要监听多个topic,而且不同topic有对应不同的groupid以及用户名密码的情况。所以本次实现了动态配置多组监听的场景,同理其它配置也可以修改配置多套。Talk is cheap
配置内容:
kafka:
consumer:
# topic::groupId::账号::密码,英文逗号分割
cp: topic1::groupid1::user1::pwd1,topic2::groupid2::user2::pwd2
# 账号密码
security:
# 加密协议(和认证集群搭建时候的配置内容对应)
protocol: SASL_PLAINTEXT
# 加密方式(和认证集群搭建时候的配置内容对应)
sasl-mechanism: SCRAM-SHA-256
bootstrap-servers: IP1:PORT1,IP2:PORT2,IP3:PORT3
# 提交offset频率
auto-commit-interval-ms: 1000
# 偏移量失效后latest-从最新的消息读 earliest-从最早的消息开始读
auto-offset-reset: earliest
# 是否开启批量获取消息(区分是batch还是single--自定义属性)
batch-listener: true
# 并发数
concurrency: 5
# 是否自动提交offset(告知kafka当前consumer group读取到的消息位置)
enable-auto-commit: true
# 最大的等待获取消息时间(最多多久就要获取一次)与fetch-min-bytes满足一个条件,broker就会发送消息给consumer
fetch-max-wait-ms: 1000
# 最小获取消息字节数
fetch-min-bytes: 1048576
# 一次获取最大记录数
max-poll-records: 10000
# 获取记录超时时间
max-poll-time-out: 30000
# 轮询策略org.apache.kafka.clients.consumer.RangeAssignor 和 org.apache.kafka.clients.consumer.RoundRobinAssignor
partition-assignment-strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
session-time-out-ms: 30000
配置代码:(写的仓促,这个类可以用@ConfigurationProperties改造一下)
@Configuration
@EnableKafka
@Slf4j
public class KafkaConsumerConfig {
private static final Integer CONSUMER_CONFIGS_COUNT = 15;
@Value("${kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("#{'${kafka.consumer.cp}'.split(',')}")
private List cpList;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean autoCommit;
@Value("${kafka.consumer.auto-commit-interval-ms}")
private Integer autoCommitIntervalMs;
@Value("${kafka.consumer.concurrency}")
private Integer concurrency;
@Value("${kafka.consumer.batch-listener}")
private boolean batchListener;
@Value("${kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${kafka.consumer.fetch-min-bytes}")
private Integer fetchMinBytes;
@Value("${kafka.consumer.fetch-max-wait-ms}")
private Integer fetchMaxWaitMs;
@Value("${kafka.consumer.max-poll-time-out}")
private Integer maxPollTimeOut;
@Value("${kafka.consumer.session-time-out-ms}")
private Integer sessionTimeOutMs;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.partition-assignment-strategy}")
private String partitionAssignor;
@Value("${kafka.consumer.security.protocol}")
private String protocol;
@Value("${kafka.consumer.security.sasl-mechanism}")
private String mechanism;
@Autowired
private KafkaReceiverBatch kafkaReceiverBatch;
@Autowired
private KafkaReceiverSingle kafkaReceiverSingle;
private static final String P_NAME = "password";
public KafkaConsumerConfig() { log.info("Kafka消费者配置加载..."); }
@Bean
public String KafkaConsumerFactory() {
if (CollectionUtils.isEmpty(cpList)) {
return null;
}
for (String batchProp : cpList) {
// 配置之间用::分隔,分别是topic,group,name,pwd
String[] batchArray = batchProp.split("::");
ContainerProperties containerProperties = new ContainerProperties(batchArray[0]);
if (batchListener) {
containerProperties.setMessageListener(
(BatchMessageListener)data -> kafkaReceiverBatch.consumer(data));
} else {
containerProperties
.setMessageListener((MessageListener)data -> kafkaReceiverSingle.consumer(data));
}
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(
consumerFactory(batchArray[1], batchArray[2], batchArray[3]), containerProperties);
container.setConcurrency(concurrency);
container.start();
}
return "Kafka消费者加载完成";
}
private ConsumerFactory consumerFactory(String groupId, String userName, String pwd) {
return new DefaultKafkaConsumerFactory(consumerProperties(groupId, userName, pwd));
}
private Map consumerProperties(String groupId, String userName, String pwd) {
Map props = new HashMap<>(CONSUMER_CONFIGS_COUNT);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinBytes);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWaitMs);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollTimeOut);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOutMs);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partitionAssignor);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringDeserializer.class);
// 认证
props.put("security.protocol", protocol);
props.put("sasl.mechanism", mechanism);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=""
+ userName + "" " + P_NAME + "="" + pwd + "";");
return props;
}
}
批量消息处理类[KafkaReceiverBatch]
public class KafkaReceiverBatch {
public void consumer(List> consumerRecords) {
try {
Map> topic2Messages = new HashMap<>(consumerRecords.size());
for (ConsumerRecord consumerRecord : consumerRecords) {
String topic = consumerRecord.topic();
List
单个消息处理类[KafkaReceiverSingle]
public class KafkaReceiverSingle {
public void consumer(ConsumerRecord consumerRecord) {
try {
Optional> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
Object message = consumerRecord.value();
// TODO 处理消息
}
} catch (Exception e) {
e.printStackTrace();
}
}
}



