栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot集成kafka、消息发送、消费使用

springboot集成kafka、消息发送、消费使用

文章目录

简介jar引入配置文件KafkaConfiguration消费工程配置kafka消息发送、消费示例

简介
	本示例用于kafka在springboot中的配置、消息发送及消息消费使用代码示例。
jar引入

代码示例:

	
    
        org.springframework.kafka
        spring-kafka
    
配置文件
#kafka配置
#指定kafka代理地址(集群配多个、中间、逗号隔开)
spring.kafka.bootstrap-servers=ip:9092

#producer生产环境配置===========================
#重试次数
spring.kafka.producer.retries=1
#默认批量大小(produce积累到一定数据,一次发送)
spring.kafka.producer.batch-size=16384
#缓冲总内存大小(32M)
spring.kafka.producer.buffer-memory=33554432
#kafka原生的StringSerializer编码序列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#kafka原生的StringSerializer解码序列化方式
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#consumer消费环境配置===========================
#消费者标识字符串(自定义、标记消费者是谁)
spring.kafka.consumer.boot.group-id=boot_group_id
#kafka偏移量设置(earliest:从头开始消费)
spring.kafka.consumer.auto-offset-reset=earliest
#在一次 poll() 调用中返回的最大记录数
spring.kafka.consumer.max-poll-records=100
#设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#消费者偏移自动提交给Kafka的频率(以毫秒为单位)、默认值为5000
spring.kafka.consumer.auto-commit-interval=1000
#kafka原生的StringSerializer编码序列化方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#kafka原生的StringSerializer解码序列化方式
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
KafkaConfiguration消费工程配置

代码示例:

package com.gxl.springbootproject.config.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;


@Configuration
@EnableKafka
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    @Value("${spring.kafka.consumer.boot.group-id}")
    private String bootGroupId;

    
    @Bean
    public Map consumerConfigs() throws ClassNotFoundException {
        Map props = new HashMap<>();
        //指定kafka代理地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //kafka偏移量设置
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //在一次 poll() 调用中返回的最大记录数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //设置自动提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        //消费者偏移自动提交给Kafka的频率(以毫秒为单位)、默认值为5000
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        //kafka原生的StringSerializer编码序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Class.forName(keyDeserializer));
        //kafka原生的StringSerializer解码序列化方式
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Class.forName(valueDeserializer));
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        return props;
    }

    
    @Bean
    public KafkaListenerContainerFactory boot_batchFactory() throws ClassNotFoundException {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        Map props = consumerConfigs();
        //指定消费者group-id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, bootGroupId);
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

}
kafka消息发送、消费示例

代码示例:

package com.gxl.springbootproject.controller;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;


@Api(tags = "kafka接口管理")
@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {

    @Resource
    KafkaTemplate kafkaTemplate;

    @ApiOperation("kafka消息发送")
    @PostMapping("/send/message")
    public void send(@RequestParam("message") String message){
        //kafka消息发送(topic:【自定义,与消费者topic一致】、message【消息内容】)
        kafkaTemplate.send("boot_topic",message);
        log.info("kafka消息发送成功,message=" + message);
    }

    
    @KafkaListener(topics = "boot_topic", containerFactory = "boot_batchFactory")
    public void bootTopic(String message){
        log.info("kafka消息接收成功,message=" + message);
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784369.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号