栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Windows环境下Springboot 集成Kafka

Windows环境下Springboot 集成Kafka

Windows环境下Springboot 集成Kafka

由于kafka是基于zookeeper运行的,在kafka2.x的版本中,自带了zk,本文介绍的是基于2.X版本的集成

kafka下载地址

https://kafka.apache.org/downloads

下载完成后解压到指定目录,如果用自带的路径,后面命令后会报集群属性异常,因为自带的路径是不确定的,需要修改配置文件
修改/config文件下的 zookeeper.properties 和 server.properties
zookeeper.properties文件:

dataDir=D:kafkakafka_2.12-3.1.0tempzookeeper

server.properties文件:

log.dirs=D:kafkakafka_2.12-3.1.0tempkafka

启动kafka时,需要启动三个cmd命令窗口,因为不是后台启动,所以窗口不能关闭
zookeeper进入windows目录执行:
zookeeper-server-start.bat …/…/config/zookeeper.properties

kafka 进入windows目录执行:
kafka-server-start.bat …/…/config/server.properties

新建窗口用来执行 创建topic
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic demo
PS:2.X以上的版本由原先的 --zookeeper localhost:2181 更改为现在的 --bootstrap-server localhost:9092
硬性装备已经准备完成了,现在开始集成SpringBoot

引入依赖

   
          org.springframework.kafka
          spring-kafka
   

修改appliaction后缀为.yml

server:
 port: 8080
spring:
 application:
  name: springboot-kafka
 kafka:
  #kafka的配置
  bootstrap-servers: 127.0.0.1:9092
  consumer:
   auto-commit-interval: 1000  #多长时间提交偏移量,自动提交
   auto-offset-reset: earliest   #设置偏移量 吐过kafaka中找不到当前消费者的偏移量,则直接将偏移量重置为最早的
   enable-auto-commit: true   #消费者的偏移量是自动提交还是手动提交。此处自动提交偏移量
   group-id: springboot-consumer02
   key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
   value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  producer:
   batch-size: 16384 #生产者每个批次放多少生产者
   buffer-memory: 33554432  #生产者一端总的可用发送缓冲区大小、此处设置为32mb
   key-serializer: org.apache.kafka.common.serialization.StringSerializer
   value-serializer: org.apache.kafka.common.serialization.StringSerializer

创建生产者

@RestController
public class KafkaSyncProducerController {

    @Resource
    private KafkaTemplate stringKafkaTemplate;

    //同步发送消息
    @RequestMapping("/kafka/sync/{msg}")
    public String sendMSG(@PathVariable("msg")String msg){

        stringKafkaTemplate.send("demo", msg);
        return "success";
    }
    
}

创建消费者

@Component
public class KafkaConsumerController {

    //设置监听器
    @KafkaListener(topics = "demo")
    public void listener(ConsumerRecord record){
        System.out.println("消费者消费到的消息:"+record.topic()+","+record.value());
    }
}

发送请求

检查是否完成消费

参考文章:https://blog.csdn.net/zhaoyy0513/article/details/103904778

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745255.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号