栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Springboot 集成 kafka,配合使用spring线程池

Springboot 集成 kafka,配合使用spring线程池

提示:文章如有错误的地方请指出,以免误人子弟!

文章目录

一、导入maven jar包二、kafka 安装及使用

安装使用 三、spring线程数配合kafka使用四、参考文章


提示:以下是本篇文章正文内容,下面案例可供参考

一、导入maven jar包

maven 地址,不写版本号也可以,会自动获取。


     org.springframework.kafka
     spring-kafka
     2.8.2
 

二、kafka 安装及使用 安装

这边都使用docker安装,比较方便。

    安装zookeeper

下载镜像

docker pull wurstmeister/zookeeper

启动容器

docker run -d --name zookeeper --publish 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
    安装kafka

下载镜像

docker pull wurstmeister/kafka

启动容器(注意替换自己服务器地址)

docker run -d --name kafka --publish 9092:9092 
--link zookeeper 
--env KAFKA_ZOOKEEPER_CONNECT=192.168.96.135:2181 
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.96.135:9092 
--env KAFKA_ADVERTISED_HOST_NAME=192.168.96.135 
--env KAFKA_ADVERTISED_PORT=9092  
-v /etc/localtime:/etc/localtime 
wurstmeister/kafka
使用
    application.yml
spring:
  application:
    name: ThreadStudyDemo

  main:
    allow-bean-definition-overriding: true
    
  ###########【Kafka集群】###########
  kafka:
    bootstrap-servers: 192.168.96.135:9092
  ###########【初始化生产者配置】###########
  # 重试次数
    producer:
      retries: 2
      # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      acks: 1

      # 批量大小
      batch-size: 16384
      properties:
        # 自定义分区器
#        partitioner.class: com.felix.kafka.producer.CustomizePartitioner
        # 提交延时
        linger:
          ms: 0
      # 当生产端积累的消息达到batch-size或接收到消息linger:ms后,生产者就会将消息提交给kafka
      # linger:ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
      # 生产端缓冲区大小
      buffer-memory: 33554432
      # Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    ###########【初始化消费者配置】###########
    consumer:
      properties:
        # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        session:
          timeout:
            ms: 120000
        # 消费请求超时时间
        request:
          timeout:
            ms: 180000
        # 默认的消费组ID
        group:
          id: defaultConsumerGroup
      # 是否自动提交offset
      enable-auto-commit: true
      # 提交offset延时(接收到消息后多久提交offset)
      auto-commit-interval: 1000
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset;
      # latest:重置为分区中最新的offset(消费分区中新产生的数据);
      # none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: latest
      # Kafka提供的序列化和反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量消费每次最多消费多少条消息
#      max-poll-records: 50
    listener:
      # 消费端监听的topic不存在时,项目启动会报错(关掉)
      missing-topics-fatal: false
      # 设置批量消费
#      type: batch
    config
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;

import javax.annotation.Resource;


@Configuration
public class KafkaConfig {

    @Resource
    private ConsumerFactory consumerFactory;

    
    @Bean
    public NewTopic createTopic() {
        return new NewTopic("sendMsgTopic", 8, (short)1);
    }

    @Bean
    public NewTopic createTopics() {
        return new NewTopic("sendMessageTopic", 8, (short)1);
    }

    
//    @Bean
//    public NewTopic updateTopic() {
//        return new NewTopic("sendMsgTopic", 10, (short)2);
//    }

    // 新建一个异常处理器,用@Bean注入
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return (message, exception, consumer) -> {
            System.out.println("消费异常:"+message.getPayload());
            return "消费异常:"+message.getPayload();
        };
    }

    
    // 监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止KafkaListener自启动
        container.setAutoStartup(false);
        return container;
    }
}
    正常使用(更详细的参数使用,请看文章结尾的参考链接)
    controller,send消息。
    
    @GetMapping("callback")
    public String callbackConsumptionMsg(String message) {
        kafkaTemplate.send("sendMsgTopic","pic", message).addCallback(
            success -> {
                if (success != null) {
                    // 消息发送到的topic
                    String topic = success.getRecordmetadata().topic();
                    // 消息发送到的分区
                    int partition = success.getRecordmetadata().partition();
                    // 消息在分区内的offset
                    long offset = success.getRecordmetadata().offset();
                    System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
                }
            },
            failure -> {
                System.out.println("发送消息失败:" + failure.getMessage());
            }
        );
        return message;
    }

消费消息 @KafkaListener
首先在该方法所在的类上面加上@Component将该类交由spring管理
errorHandler = "consumerAwareErrorHandler",异常消费方法请查看上面config里面配置的消费异常回调方法,方法名:consumerAwareErrorHandler。

    
    @KafkaListener(id = "manyConsumptionMsg",
            groupId = "defaultConsumerGroup",
            topicPartitions = {@TopicPartition(topic = "sendMsgTopic", partitions = "5")},
            errorHandler = "consumerAwareErrorHandler")
    public void manyConsumptionMsg(ConsumerRecord record) {
        System.out.println("消费图片消息---"+"topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
    }
    定时使用 消费方法
    首先在该方法所在的类上面加上@EnableScheduling启动定时,@Component将该类交由spring管理
    containerFactory = "delayContainerFactory",监听器容器工厂请查看上面config里面配置的方法,方法名:delayContainerFactory。
	@Resource
    private KafkaListenerEndpointRegistry registry;
    
    
    @KafkaListener(id = "sendMessageTopic",
            groupId = "defaultConsumerGroup",
            topicPartitions = {@TopicPartition(topic = "sendMessageTopic", partitions = "5")},
            errorHandler = "consumerAwareErrorHandler",
            containerFactory = "delayContainerFactory")
    public void sendMessageTopic(ConsumerRecord record) {
        System.out.println("消费图片消息---"+"topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
    }

    // 定时启动监听器
    @Scheduled(cron = "0 23 16 4 * ?")
    public void startListener() {
        System.out.println("启动定时监听·······");
        // ”timingCounsumer“是@KafkaListener注解后面设置的监听器id,标识这个监听器
        if (!Objects.requireNonNull(registry.getListenerContainer("sendMessageTopic")).isRunning()) {
            Objects.requireNonNull(registry.getListenerContainer("sendMessageTopic")).start();
        }

//        registry.getListenerContainer("sendMessageTopic").resume();
    }

    // 定时关闭监听器
    @Scheduled(cron = "0 25 16 4 * ?")
    public void shutDownListener() {
        System.out.println("定时关闭监听器·····");
        Objects.requireNonNull(registry.getListenerContainer("sendMessageTopic")).pause();
    }
三、spring线程数配合kafka使用
    config
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;


@Configuration
public class ExecturConfig {
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 获取到服务器的CPU内核
        int i = Runtime.getRuntime().availableProcessors();
        // 核心池大小
        executor.setCorePoolSize(5);
        // 最大线程数
        executor.setMaxPoolSize(100);
        // 队列程度
        executor.setQueueCapacity(1000);
        // 线程空闲时间
        executor.setKeepAliveSeconds(1000);
        // 线程后缀名称
        executor.setThreadNamePrefix("task-asyn");
        
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        return executor;
    }
}
    创建一个线程消费公用方法
    首先在该方法所在的类上面加上@Component将该类交由spring管理
    @Async("taskExecutor") @Async:异步线程注解,taskExecutor:config里配置的线程池信息的方法名
    countDownLatch 计数器配置异步线程使用,让其他线程执行完,再执行主线程。
    @Async("taskExecutor")
    public void outAsyncTest(ConsumerRecord record, CountDownLatch countDownLatch) {
        System.out.println("消费图片消息---"+"topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
        System.out.println(Thread.currentThread().getName() + ":执行了...");
        countDownLatch.countDown();
    }
    配合使用
    首先在该方法所在的类上面加上@Component将该类交由spring管理

kafka单个消息消费

    
    @KafkaListener(id = "manyConsumptionMsg",
            groupId = "defaultConsumerGroup",
            topicPartitions = {@TopicPartition(topic = "sendMsgTopic", partitions = "5")},
            errorHandler = "consumerAwareErrorHandler")
    public void manyConsumptionMsg(ConsumerRecord record) {

        CountDownLatch countDownLatch = new CountDownLatch(1);
        threadUtil.outAsyncTest(record, countDownLatch);
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("主线程开始······其他线程执行完毕!");
    }

kafka批量消息消费
首先修改kafka yml配置

    @KafkaListener(id = "manyConsumptionMsg",
            groupId = "defaultConsumerGroup",
            topicPartitions = {@TopicPartition(topic = "sendMsgTopic", partitions = "5")},
            errorHandler = "consumerAwareErrorHandler")
    public void manyConsumptionMsgList(List> recordList) {
        System.out.println("本次消息数量"+ recordList.size());
        // 计数器
        CountDownLatch countDownLatch = new CountDownLatch(recordList.size());
        for (ConsumerRecord record : recordList) {
            threadUtil.outAsyncTest(record, countDownLatch);
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("主线程开始······其他线程执行完毕!");
    }

四、参考文章

kafka 实战
spring 线程池


希望对你有所帮助!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774269.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号