栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot 2.6 集成 Kafka 2.8

SpringBoot 2.6 集成 Kafka 2.8

文章目录

1 摘要2 核心 Maven 依赖3 Kafka 服务配置3 消息生产者-Producer

3.1 application 配置文件3.2 生产者发送消息3.3 主题定义与消息实体3.4 发送示例-Controller3.5 SpringBoot 启动类 4 消息消费者-Consumer

4.1 application 配置文件4.2 消费者接收消息 5 测试6 推荐参考资料7 Github 源码

1 摘要

Kafka 本身作为流处理平台,在大数据处理能力上应用广泛;同时 Kafka 也可以作为消息队列。本文将介绍基于 SpringBoot 2.6 集成 Kafka 2.8。

2 核心 Maven 依赖
./demo-kafka-producer/pom.xml
./demo-kafka-consumer/pom.xml
        
        
            org.springframework.kafka
            spring-kafka
        

其中 SpringBoot 的版本为 2.6.3, spring-kafka 的版本为 2.8.2。

完整依赖

    
        
        
            org.springframework.boot
            spring-boot-starter-web
            ${springboot.version}
        
        
        
            org.springframework.kafka
            spring-kafka
        
        
        
            cn.hutool
            hutool-all
            ${hutool.version}
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.springframework.kafka
            spring-kafka-test
            test
        
        
            org.projectlombok
            lombok
            true
        
    

对应版本

    
        1.8
        UTF-8
        ${java.version}
        ${java.version}
        2.6.3
        5.7.21
    
3 Kafka 服务配置

Kafka broker 核心配置

server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://your.publicIP:9092
# A comma separated list of directories under which to store log files
log.dirs=/var/log/kafka/logs-1
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=127.0.0.1:2181

必须确保能够通过外部访问 Kafka 服务

3 消息生产者-Producer 3.1 application 配置文件
./demo-kafka-producer/src/main/resources/application.yml
## config

## server
server:
  port: 8900

## spring
spring:
  application:
    name: demo-kafka-producer
  kafka:
    bootstrap-servers: 172.16.140.10:9092
    consumer:
      group-id: 1
      enable-auto-commit: true
      auto-commit-interval: 100ms
      properties:
        session.timeout.ms: 15000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

3.2 生产者发送消息
./demo-kafka-producer/src/main/java/com/ljq/demo/springboot/kafka/producer/common/mq/KafkaMQProducer.java
package com.ljq.demo.springboot.kafka.producer.common.mq;

import cn.hutool.json.JSONUtil;
import com.ljq.demo.springboot.kafka.producer.common.constant.KafkaMessageConst;
import com.ljq.demo.springboot.kafka.producer.model.entity.KafkaMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class KafkaMQProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    
    public void send(String msg) {
        KafkaMessageEntity kafkaMessage = new KafkaMessageEntity();
        kafkaMessage.setId("message-" + System.currentTimeMillis());
        kafkaMessage.setMessage(msg);
        log.info("kafka message: {}", JSONUtil.toJsonStr(kafkaMessage));
        kafkaTemplate.send(KafkaMessageConst.KAFKA_TOPIC_DEMO, JSONUtil.toJsonStr(kafkaMessage));
    }

}
3.3 主题定义与消息实体

主题常量类

./demo-kafka-producer/src/main/java/com/ljq/demo/springboot/kafka/producer/common/constant/KafkaMessageConst.java
package com.ljq.demo.springboot.kafka.producer.common.constant;


public class KafkaMessageConst {

    private KafkaMessageConst() {
    }

    public static final String KAFKA_TOPIC_DEMO = "kafka_topic_demo";


}

消息实体类

./demo-kafka-producer/src/main/java/com/ljq/demo/springboot/kafka/producer/model/entity/KafkaMessageEntity.java
package com.ljq.demo.springboot.kafka.producer.model.entity;

import lombok.Data;

import java.io.Serializable;


@Data
public class KafkaMessageEntity implements Serializable {

    private static final long serialVersionUID = -3812375964256200394L;

    private String id;

    private String message;

}
3.4 发送示例-Controller
./demo-kafka-producer/src/main/java/com/ljq/demo/springboot/kafka/producer/controller/KafkaMessageController.java
package com.ljq.demo.springboot.kafka.producer.controller;

import com.ljq.demo.springboot.kafka.producer.common.mq.KafkaMQProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@Slf4j
@RestController
@RequestMapping("/api/demo/kafka")
public class KafkaMessageController {

    @Autowired
    private KafkaMQProducer kafkaMQProducer;

    
    @PostMapping(value = "/send", produces = {MediaType.APPLICATION_JSON_VALUE})
    public ResponseEntity send(String message) {
        log.info("request path: {}, param: {}", "/api/demo/kafka/send", message);
        kafkaMQProducer.send(message);
        return ResponseEntity.ok(message);
    }


}
3.5 SpringBoot 启动类

在消息发送之前需要创建主题

./demo-kafka-producer/src/main/java/com/ljq/demo/springboot/kafka/producer/DemoKafkaProducerApplication.java
package com.ljq.demo.springboot.kafka.producer;

import com.ljq.demo.springboot.kafka.producer.common.constant.KafkaMessageConst;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;


@SpringBootApplication
public class DemoKafkaProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoKafkaProducerApplication.class, args);
    }


    @Bean
    public KafkaAdmin.NewTopics topics() {
        return new KafkaAdmin.NewTopics(TopicBuilder.name(KafkaMessageConst.KAFKA_TOPIC_DEMO).build());
    }
}

spring-kafka 2.8 支持同时创建多个主题,同时创建的操作也非常简洁

4 消息消费者-Consumer 4.1 application 配置文件
./demo-kafka-consumer/src/main/resources/application.yml
## config

## server
server:
  port: 8901

## spring
spring:
  application:
    name: demo-kafka-consumer
  kafka:
    bootstrap-servers: 172.16.140.10:9092
    consumer:
      group-id: 1
      enable-auto-commit: true
      auto-commit-interval: 100ms
      properties:
        session.timeout.ms: 15000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
4.2 消费者接收消息
./demo-kafka-consumer/src/main/java/com/ljq/demo/springboot/kafka/consumer/common/mq/KafkaMQConsumer.java
package com.ljq.demo.springboot.kafka.consumer.common.mq;

import cn.hutool.json.JSONUtil;
import com.ljq.demo.springboot.kafka.consumer.common.constant.KafkaMessageConst;
import com.ljq.demo.springboot.kafka.consumer.model.entity.KafkaMessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;


@Slf4j
@Component
public class KafkaMQConsumer {


    
    @KafkaListener(topics = {KafkaMessageConst.KAFKA_TOPIC_DEMO})
    public void receive(ConsumerRecord record) {
        log.info("record: {}", record);
        Optional.ofNullable(record.value())
                .ifPresent(message -> {
                    log.info("message: {}", JSONUtil.toBean(String.valueOf(message), KafkaMessageEntity.class));
                });
    }

}
5 测试

分别启动生产者服务和消费者服务

请求发送接口

接口地址:

http://127.0.0.1:8900/api/demo/kafka/send?message=kakakakak

请求方式: POST

生产者后台日志:

2022-03-01 18:27:53 | INFO  | http-nio-8900-exec-6 | com.ljq.demo.springboot.kafka.producer.controller.KafkaMessageController 33| request path: /api/demo/kafka/send, param: kakakakak
2022-03-01 18:27:53 | INFO  | http-nio-8900-exec-6 | com.ljq.demo.springboot.kafka.producer.common.mq.KafkaMQProducer 32| kafka message: {"message":"kakakakak","id":"message-1646130473195"}

消费者后台日志:

2022-03-01 18:27:53 | INFO  | org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 | com.ljq.demo.springboot.kafka.consumer.common.mq.KafkaMQConsumer 29| record: ConsumerRecord(topic = kafka_topic_demo, partition = 0, leaderEpoch = 0, offset = 5279, CreateTime = 1646130473195, serialized key size = -1, serialized value size = 52, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = {"message":"kakakakak","id":"message-1646130473195"})
2022-03-01 18:27:53 | INFO  | org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 | com.ljq.demo.springboot.kafka.consumer.common.mq.KafkaMQConsumer 32| message: KafkaMessageEntity(id=message-1646130473195, message=kakakakak)

至此,SpringBoot 集成 kafka 已经完成。

6 推荐参考资料

官方文档-APACHE KAFKA QUICKSTART

SpringBoot集成Kafka

How to Work with Apache Kafka in Your Spring Boot Application

Spring for Apache Kafka

Kafka常见错误整理

SpringBoot工程连远程Kafka报错UnknownHostException

7 Github 源码

Gtihub 源码地址 : https://github.com/Flying9001/springBootDemo

个人公众号:404Code,分享半个互联网人的技术与思考,感兴趣的可以关注.

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752145.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号