文章目录提示:文章如有错误的地方请指出,以免误人子弟!
一、导入maven jar包二、kafka 安装及使用
安装使用 三、spring线程数配合kafka使用四、参考文章
提示:以下是本篇文章正文内容,下面案例可供参考
一、导入maven jar包maven 地址,不写版本号也可以,会自动获取。
org.springframework.kafka spring-kafka 2.8.2
二、kafka 安装及使用 安装
这边都使用docker安装,比较方便。
- 安装zookeeper
下载镜像
docker pull wurstmeister/zookeeper
启动容器
docker run -d --name zookeeper --publish 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
- 安装kafka
下载镜像
docker pull wurstmeister/kafka
启动容器(注意替换自己服务器地址)
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=192.168.96.135:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.96.135:9092 --env KAFKA_ADVERTISED_HOST_NAME=192.168.96.135 --env KAFKA_ADVERTISED_PORT=9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka使用
- application.yml
spring:
application:
name: ThreadStudyDemo
main:
allow-bean-definition-overriding: true
###########【Kafka集群】###########
kafka:
bootstrap-servers: 192.168.96.135:9092
###########【初始化生产者配置】###########
# 重试次数
producer:
retries: 2
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
acks: 1
# 批量大小
batch-size: 16384
properties:
# 自定义分区器
# partitioner.class: com.felix.kafka.producer.CustomizePartitioner
# 提交延时
linger:
ms: 0
# 当生产端积累的消息达到batch-size或接收到消息linger:ms后,生产者就会将消息提交给kafka
# linger:ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
buffer-memory: 33554432
# Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
###########【初始化消费者配置】###########
consumer:
properties:
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
session:
timeout:
ms: 120000
# 消费请求超时时间
request:
timeout:
ms: 180000
# 默认的消费组ID
group:
id: defaultConsumerGroup
# 是否自动提交offset
enable-auto-commit: true
# 提交offset延时(接收到消息后多久提交offset)
auto-commit-interval: 1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-offset-reset: latest
# Kafka提供的序列化和反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 批量消费每次最多消费多少条消息
# max-poll-records: 50
listener:
# 消费端监听的topic不存在时,项目启动会报错(关掉)
missing-topics-fatal: false
# 设置批量消费
# type: batch
- config
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import javax.annotation.Resource;
@Configuration
public class KafkaConfig {
@Resource
private ConsumerFactory consumerFactory;
@Bean
public NewTopic createTopic() {
return new NewTopic("sendMsgTopic", 8, (short)1);
}
@Bean
public NewTopic createTopics() {
return new NewTopic("sendMessageTopic", 8, (short)1);
}
// @Bean
// public NewTopic updateTopic() {
// return new NewTopic("sendMsgTopic", 10, (short)2);
// }
// 新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
System.out.println("消费异常:"+message.getPayload());
return "消费异常:"+message.getPayload();
};
}
// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止KafkaListener自启动
container.setAutoStartup(false);
return container;
}
}
- 正常使用(更详细的参数使用,请看文章结尾的参考链接)
controller,send消息。
@GetMapping("callback")
public String callbackConsumptionMsg(String message) {
kafkaTemplate.send("sendMsgTopic","pic", message).addCallback(
success -> {
if (success != null) {
// 消息发送到的topic
String topic = success.getRecordmetadata().topic();
// 消息发送到的分区
int partition = success.getRecordmetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordmetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}
},
failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
}
);
return message;
}
消费消息 @KafkaListener
首先在该方法所在的类上面加上@Component将该类交由spring管理
errorHandler = "consumerAwareErrorHandler",异常消费方法请查看上面config里面配置的消费异常回调方法,方法名:consumerAwareErrorHandler。
@KafkaListener(id = "manyConsumptionMsg",
groupId = "defaultConsumerGroup",
topicPartitions = {@TopicPartition(topic = "sendMsgTopic", partitions = "5")},
errorHandler = "consumerAwareErrorHandler")
public void manyConsumptionMsg(ConsumerRecord, ?> record) {
System.out.println("消费图片消息---"+"topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
- 定时使用 消费方法
首先在该方法所在的类上面加上@EnableScheduling启动定时,@Component将该类交由spring管理
containerFactory = "delayContainerFactory",监听器容器工厂请查看上面config里面配置的方法,方法名:delayContainerFactory。
@Resource
private KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "sendMessageTopic",
groupId = "defaultConsumerGroup",
topicPartitions = {@TopicPartition(topic = "sendMessageTopic", partitions = "5")},
errorHandler = "consumerAwareErrorHandler",
containerFactory = "delayContainerFactory")
public void sendMessageTopic(ConsumerRecord, ?> record) {
System.out.println("消费图片消息---"+"topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
// 定时启动监听器
@Scheduled(cron = "0 23 16 4 * ?")
public void startListener() {
System.out.println("启动定时监听·······");
// ”timingCounsumer“是@KafkaListener注解后面设置的监听器id,标识这个监听器
if (!Objects.requireNonNull(registry.getListenerContainer("sendMessageTopic")).isRunning()) {
Objects.requireNonNull(registry.getListenerContainer("sendMessageTopic")).start();
}
// registry.getListenerContainer("sendMessageTopic").resume();
}
// 定时关闭监听器
@Scheduled(cron = "0 25 16 4 * ?")
public void shutDownListener() {
System.out.println("定时关闭监听器·····");
Objects.requireNonNull(registry.getListenerContainer("sendMessageTopic")).pause();
}
三、spring线程数配合kafka使用
- config
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ExecturConfig {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 获取到服务器的CPU内核
int i = Runtime.getRuntime().availableProcessors();
// 核心池大小
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(100);
// 队列程度
executor.setQueueCapacity(1000);
// 线程空闲时间
executor.setKeepAliveSeconds(1000);
// 线程后缀名称
executor.setThreadNamePrefix("task-asyn");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
}
}
- 创建一个线程消费公用方法
首先在该方法所在的类上面加上@Component将该类交由spring管理
@Async("taskExecutor") @Async:异步线程注解,taskExecutor:config里配置的线程池信息的方法名
countDownLatch 计数器配置异步线程使用,让其他线程执行完,再执行主线程。
@Async("taskExecutor")
public void outAsyncTest(ConsumerRecord, ?> record, CountDownLatch countDownLatch) {
System.out.println("消费图片消息---"+"topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
System.out.println(Thread.currentThread().getName() + ":执行了...");
countDownLatch.countDown();
}
- 配合使用
首先在该方法所在的类上面加上@Component将该类交由spring管理
kafka单个消息消费
@KafkaListener(id = "manyConsumptionMsg",
groupId = "defaultConsumerGroup",
topicPartitions = {@TopicPartition(topic = "sendMsgTopic", partitions = "5")},
errorHandler = "consumerAwareErrorHandler")
public void manyConsumptionMsg(ConsumerRecord, ?> record) {
CountDownLatch countDownLatch = new CountDownLatch(1);
threadUtil.outAsyncTest(record, countDownLatch);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程开始······其他线程执行完毕!");
}
kafka批量消息消费
首先修改kafka yml配置
@KafkaListener(id = "manyConsumptionMsg",
groupId = "defaultConsumerGroup",
topicPartitions = {@TopicPartition(topic = "sendMsgTopic", partitions = "5")},
errorHandler = "consumerAwareErrorHandler")
public void manyConsumptionMsgList(List> recordList) {
System.out.println("本次消息数量"+ recordList.size());
// 计数器
CountDownLatch countDownLatch = new CountDownLatch(recordList.size());
for (ConsumerRecord, ?> record : recordList) {
threadUtil.outAsyncTest(record, countDownLatch);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程开始······其他线程执行完毕!");
}
四、参考文章
kafka 实战
spring 线程池
希望对你有所帮助!



