栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot集成Kafka的简单教程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot集成Kafka的简单教程

一、引入相关的Jar包(maven)

项目构建工具使用的是maven:



    
        org.springframework.boot
        spring-boot-starter-parent
        2.2.8.RELEASE
         
    
    4.0.0

    springboot-kafka

    
        1.8
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.kafka
            spring-kafka
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
            
                
                    org.junit.vintage
                    junit-vintage-engine
                
            
        
        
            org.springframework.kafka
            spring-kafka-test
            test
        
    
    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

二、搭建步骤 1.添加配置文件

application.properties的配置代码如下:

spring.application.name=springboot-kafka-02
server.port=8080
# 用于建立初始连接的broker地址
spring.kafka.bootstrap-servers=192.168.42.21:9092
# producer用到的key和value的序列化类
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
# 默认的批处理记录数
spring.kafka.producer.batch-size=16384
# 32MB的总发送缓存
spring.kafka.producer.buffer-memory=33554432
# consumer用到的key和value的反序列化类
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消费组id
spring.kafka.consumer.group-id=spring-kafka-consumer-02
# 是否自动动提交消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交一次偏移量
spring.kafka.consumer.auto-commit-interval=100
# 如果该消费者的偏移量不存在,则自动设置为最早的偏移量
spring.kafka.consumer.auto-offset-reset=earliest
2.添加相关的项目包结构

3.添加SpringBoot启动主类
package com.kafka.learn;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class KafkaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }
}
4.添加Kakfa配置信息类
package com.kafka.learn.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class KafkaConfig {

    
    @Bean
    public NewTopic topic1() {
        return new NewTopic("topic1", 5, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("topic2", 3, (short) 1);
    }
}
5.添加Kakfa消息生产者

在Controller层添加相关的消息生产接口,主要有同步发送、异步发送

同步发送Kafka消息生成者的业务接口设计:

package com.kafka.learn.controller;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.*;

import java.util.concurrent.ExecutionException;


@RestController
@RequestMapping("/kafka/sync")
public class KafkaSyncProducerController {

    @Autowired
    private KafkaTemplate template;

    @RequestMapping(value = "/send/{message}", method = RequestMethod.GET)
    public String sendSync(@PathVariable("message") String message) {
        ListenableFuture future = template.send(new ProducerRecord(
                "topic-spring-02",
                0,
                1,
                message
        ));

        try {
            // 同步等待broker的响应
            Object o = future.get();
            SendResult result = (SendResult) o;

            System.out.println(result.getRecordmetadata().topic()
                    + result.getRecordmetadata().partition()
                    + result.getRecordmetadata().offset());

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "success";
    }

    @RequestMapping(value = "/{topic}/send", method = RequestMethod.GET)
    public void sendMessageToTopic(@PathVariable("topic") String topic,
                                   @RequestParam(value = "partition", defaultValue = "0") int partition) {
        System.out.println("开发发送消息给kafka:" + topic);
        template.send(topic, partition, partition, "你好,kafka");
    }
}

异步发送Kafka消息生成者的业务接口设计:

package com.kafka.learn.controller;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("/kafka/async")
public class KafkaAsyncProducerController {

    @Autowired
    private KafkaTemplate template;

    @RequestMapping("/send/{message}")
    public String asyncSend(@PathVariable("message") String message) {

        ProducerRecord record = new ProducerRecord<>(
                "topic-spring-02",
                0,
                3,
                message
        );

        ListenableFuture> future = template.send(record);

        // 添加回调,异步等待响应
        future.addCallback(new ListenableFutureCallback>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult result) {
                System.out.println("发送成功:" +
                        result.getRecordmetadata().topic() + "t"
                        + result.getRecordmetadata().partition() + "t"
                        + result.getRecordmetadata().offset());
            }
        });
        return "success";
    }
}
6.添加Kakfa消息消费者
package com.kafka.learn.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.Optional;


@Component
public class MyConsumer {

    @KafkaListener(topics = "topic-spring-02")
    public void onMessage(ConsumerRecord record) {
        Optional> optional =
                Optional.ofNullable(record);
        if (optional.isPresent()) {
            System.out.println(
                    record.topic() + "t"
                            + record.partition() + "t"
                            + record.offset() + "t"
                            + record.key() + "t"
                            + record.value());
        }
    }


    
    @KafkaListener(id = "listen01",
    topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = { "0", "3" }),
            @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4"))
    })
    public void listen(ConsumerRecord record) {
        System.out.println("topic" + record.topic());
        System.out.println("key:" + record.key());
        System.out.println("value:"+record.value());
    }

}
三、测试结果

测试kafka消息的同步发送:接口 --- http://localhost:8080/kafka/sync/send/你好,kafka

 后端消费结果:

2021-11-12 10:54:20.143  INFO 20160 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-12 10:54:20.144  INFO 20160 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-11-12 10:54:20.150  INFO 20160 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 6 ms
2021-11-12 10:54:20.186  INFO 20160 --- [nio-8080-exec-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [192.168.42.21:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2021-11-12 10:54:20.203  INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2021-11-12 10:54:20.203  INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2021-11-12 10:54:20.203  INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1636685660203
2021-11-12 10:54:20.213  INFO 20160 --- [ad | producer-1] org.apache.kafka.clients.metadata: [Producer clientId=producer-1] Cluster ID: BNUXbtH2RYmLyhUrarFpBw
topic-spring-02	0	1	1	你好,kafka
topic-spring-0201

测试kafka消息的异步发送:接口 --- http://localhost:8080/kafka/async/send/你好async,kafka

 后端消费结果:

开发发送消息给kafka:topic1
topictopic1
key:3
value:你好,kafka

测试发送到消费者消费外的分区:http://localhost:8080/kafka/sync/topic1/send?partition=4

后端消费结果(无消费日志):

开发发送消息给kafka:topic1

测试kafka消息的指定分区消费的topic1的同步发送:

        接口 --- http://localhost:8080/kafka/sync/topic2/send?partition=0

后端消费结果:

开发发送消息给kafka:topic2
topictopic2
key:0
value:你好,kafka
总结

  SpringBoot集成Kafka很简单,相关的配置信息很多都已经被Kafka相关的项目组设置好,我们只要配置一些必须的参数,即可完成对Kafka的集成。当然要是需要对Kafka的发送和消费做业务上的限制,就需要我们去做二次封装了,比如:使用自定义的序列化方式,Kafka的生成者使用自定义的封装对象--- 限制发送的内容等等。

  Kafka简单集成使用的是单机场景下的配置,若考虑集群情况下,还需要考虑一下kafka相关的配置参数,需要按自己的业务需求去调整。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/490621.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号