根据kafka的生产消费模型,kafka的topic可以创建成多个分区,一个消费者组内,一个消费者是可以对应多个分区的,但是通常出于性能考虑,最好让一个分区能对应到一个consumer。
一种实现思路是:topic有多少个分区,然后consumer节点就启动多少个,这样多少会有些浪费。如果是微服务应用,这个应用只做消费数据这一件事情,分配1-2个G的内存资源还是有些浪费的。
另外一种思路就是:单个应用实例启动多个线程,然后多个线程分别对应一个分区。
代码配置实现这里使用 spring-kafka 框架进行配置。
org.springframework.kafka spring-kafka 2.1.7.RELEASE
配置类代码
package com.xxx.es.job.config;
import com.xxx.es.job.mq.PointsTransactionDetailKafkaListenerV2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Configurable
@Component
@EnableKafka
public class KafkaConsumerConfig {
//Properties properties = PropertiesUtils.read("kafka.properties");
public KafkaConsumerConfig() {
System.out.println("kafka消费者配置加载...");
}
public Map consumerProperties() {
Map props = new HashMap<>();
//Kafka服务地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
//消费组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "point3");
//设置
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//设置间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//Key反序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//Value反序列化
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//从头开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
public KafkaListenerContainerFactory> kafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3); // 设置这里不会生效。
System.out.println("factory.setConcurrency(3)");
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public PointsTransactionDetailKafkaListenerV2 kafkaConsumerListener() {
return new PointsTransactionDetailKafkaListenerV2();
}
@Bean
public ConcurrentMessageListenerContainer container(
ConsumerFactory consumerFactory, PointsTransactionDetailKafkaListenerV2 kafkaConsumerListener) {
// ContainerProperties containerProperties = new ContainerProperties("points_dev_scj_test.points_sharding_00.points_transaction_detail");
ContainerProperties containerProperties = new ContainerProperties("partition_test");
containerProperties.setMessageListener(kafkaConsumerListener);
containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
ConcurrentMessageListenerContainer listenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
listenerContainer.setConcurrency(3); //关键点,设置这里才能生效,这里会创建3个消费者线程
return listenerContainer;
}
}
需要注意的是:设置上图中的下面这一部分 是不会生效的,也就是不会创建多个消费者线程。
@Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(3); // 这里设置的setConcurrency(3) 不会生效, System.out.println("factory.setConcurrency(3)"); factory.getContainerProperties().setPollTimeout(3000); return factory; }
启动项目,可以在控制台看到打印的日志。



