栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot kafka 整合使用 简单demo

springboot kafka 整合使用 简单demo

一.基本版本信息

springboot2.5.5    kafka.2.4.1

二.代码部分

2.1pom.xml

4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.5.5
         
    
    com.daqu
    springboot-kafka
    0.0.1-SNAPSHOT
    springboot-kafka
    Demo project for Spring Boot
    
        1.8
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.kafka
            spring-kafka
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.springframework.kafka
            spring-kafka-test
            test
        
        
            org.projectlombok
            lombok
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

2.2application.properties 文件中参数 是可以设置在KafkaTemplate 

server.port=8088


spring.application.name=springboot-kafak

#kafka 配置
spring.kafka.bootstrap-servers=node01:9092


#producer peizhi

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#设置生产者每个批次需要放多少的数据
spring.kafka.producer.batch-size=16384

#生产者分区总 可用发送缓冲区大小,设置为32MB
spring.kafka.producer.buffer-memory=33554432




#consumer 配置
spring.kafka.consumer.key-serialization=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.consumer.value-serialization=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=springboot-consumer01


#设置offset  kafka找不到offset 将按照设置的最早的offset
spring.kafka.consumer.auto-offset-reset=earliest
#消费者的提交的方式 自动? 手动?   事务需要手动提交
spring.kafka.consumer.enable-auto-commit=true
#设置提交自动提交量的 时间
spring.kafka.consumer.auto-commit-interval=1000

2.3kafkaConfig  在这里面可以重写很多默认的参数,参数

@Configuration
public class kafkaConfig {

    //创建新的主题
    // ./kafka-topics.sh --bootstrap-server node01:2181 --list
    @Bean
    public NewTopic topic1(){
        return new NewTopic("nptc-01",1,(short)1);
    }


    //重新设置producer
    @Autowired
    @Bean
    public KafkaTemplate  kafkaTemplate(ProducerFactory producerFactory){

        HashMap configOverride = new HashMap<>();
        configOverride.put(ProducerConfig.BATCH_SIZE_CONFIG,20000);
 
        configOverride.put(ProducerConfig.ACKS_CONFIG,"all");
        //生产者发送数据最大消息的大小 1M
        configOverride.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
        //表示生产端消息发送失败时的重试次数,默认值为0
        configOverride.put(ProducerConfig.RETRIES_CONFIG,3);
        //表示生产端是否对消息进行压缩,默认值为none,即不压缩消息 综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。
        configOverride.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
        //表示生产端消息缓冲池或缓冲区的大小,默认值为33554432,即32M
        configOverride.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //表示生产端与broker之间的每个连接最多缓存的请求数,默认值为5,即每个连接最多可以缓存5个未响应的请求。这个参数通常用来解决分区乱序的问题  为了避免消息乱序问题,建议将该参数设置为1
        configOverride.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);
        // 这个参数表示生产端发送请求后等待broker端响应的最长时间,默认值为30000
        configOverride.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);


        KafkaTemplate template = new KafkaTemplate<>(producerFactory, true, configOverride);
        return template;
    }
}

2.4 kafkaSyncController 同步发送数据   可以默认的在kafka中创建topic

@Slf4j
@RestController
public class kafkaSyncController {

    @Autowired
    private KafkaTemplate template;


    @GetMapping("/sendData/sync/{message}")
    public  String  sendData(@PathVariable String message) throws ExecutionException, InterruptedException {

        ListenableFuture> future = template.send("springboot-kafka-01", 0, 0, message);

        //同步发送数据
        SendResult SendResult = future.get();
        String topic = SendResult.getRecordmetadata().topic();
        long offset = SendResult.getRecordmetadata().offset();
        int partition = SendResult.getRecordmetadata().partition();
        log.info("topic:{},offset:{},partition:{}" ,topic,offset,partition);

        return "success";

    }

}

2.5 kafkaAsyncController 异步发送数据 

@RestController
@Slf4j
public class kafkaAsyncController {

    @Autowired
    private KafkaTemplate template;

    @RequestMapping("/sentdata1/{message}")
    public String  sendData1(@PathVariable String message){
        ListenableFuture> send = template.send("springboot-kafka-02", 0, 1, message);
        send.addCallback(new ListenableFutureCallback>() {

             //设置回调函数 异步等待broker 端返回结果
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult result) {
                Recordmetadata metadata = result.getRecordmetadata();
                int partition = metadata.partition();
                long offset = metadata.offset();
                String topic = metadata.topic();
                log.info("发送消息成功:");
                log.info("topic:{},offset:{},partition:{}" ,topic,offset,partition);
            }
        });

       return "success";
    }

}

2.6  kafkaConsumer 

@Slf4j
@Component
public class kafkaConsumer {


    @KafkaListener(topics ="springboot-kafka-01" )
    public  void onMessage(ConsumerRecord record) {
     log.info("消费者收到的消息");
     log.info("topic:{},offset:{},partition:{},key:{},value:{}" ,
             record.topic(),
             record.offset(),
             record.partition(),
             record.key(),
             record.value());


    }


}

可以用postman 触发接口

http://localhost:8088//sentdata1/hello

可以看到输出台日志中已经接收到数据了.

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327088.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号