栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

JavaEE:Kafka安装/配置/收发消息

JavaEE:Kafka安装/配置/收发消息

一、Kafka安装/配置:

前提,安装Zookeeper(IP+端口:192.168.233.147:2181):

https://blog.csdn.net/a526001650a/article/details/103667691

1.下载kafka_2.13-3.1.0.tgz:

http://mirrors.aliyun.com/apache/kafka/3.1.0/

2.用 Xftp将kafka_2.13-3.1.0.tgz 上传到/root 目录。

3.解压到/usr/local 目录:

[root@localhost ~]# tar -zxvf kafka_2.13-3.1.0.tgz -C /usr/local/

4.重命名(cd /usr/local目录):

[root@localhost local]# mv kafka_2.13-3.1.0 kafka

5.修改配置:

[root@localhost kafka]# vi /usr/local/kafka/config/server.properties

内容如下 :

broker.id=0
listeners=PLAINTEXT://192.168.233.147:9092              #此为本机IP+9092端口
advertised.listeners=PLAINTEXT://192.168.233.147:9092   #此为本机IP+9092端口
log.dirs=/usr/local/kafka/kafka-logs
num.partitions=1
zookeeper.connect=192.168.233.147:2181   #连接zookeeper地址
zookeeper.connection.timeout.ms=18000

6.创建Log 目录:

[root@localhost kafka]# mkdir /usr/local/kafka/kafka-logs

7.启动/停止(cd /usr/local/kafka/bin):

(1)启动:

[root@localhost bin]# ./kafka-server-start.sh /usr/local/kafka/config/server.properties &

(2)停止:

[root@localhost bin]# ./kafka-server-stop.sh

8.创建/删除topic:

(1)创建topic(cd /usr/local/kafka/bin目录,--partitions指定分区数量,--replication-factor指定副本集数量):

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.233.147:9092 --create --topic myTopic --partitions 1 --replication-factor 1

(2)显示某个topic详情:

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.233.147:9092 --describe --topic myTopic

(3)显示所有topic名称:

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.233.147:9092 -list

(4)显示所有topic详情:

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.233.147:9092 --describe

 (5)修改某个topic分区数量(--partitions后的数字必须比创建时大):

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.233.147:9092 --alter --topic myTopic --partitions 2

(6)删除某个topic:

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.233.147:9092 --delete --topic myTopic

9.查看/删除group(SpringBoot中接收者工程启动后能看到):

(1)查看所有group:

[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.233.147:9092 -list

(2)显示某个group详情(添加--state显示状态):

[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.233.147:9092 --describe --group myGroup

(3)显示某个group活动成员列表(添加--verbose显示分区):

[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.233.147:9092 --describe --group myGroup --members

(4)删除某个group(为空才能删除,可以一次删除多个: --group myGroup1 --group myGroup2):

[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.233.147:9092 --delete --group myGroup

10.发送/接收消息:

(1)发送消息(在换行的">"后面输入消息,按Ctrl+c退出):

[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.233.147:9092 --topic myTopic

(2)接收消息(在换行中显示接收的消息):

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.233.147:9092 --topic myTopic --from-beginning

(3)查看消费进度:

[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.233.147:9092 --describe --group myGroup

二、SpringBoot集成Kafka实现接收/发送消息:

1.消息接收(消费)端:

(1)pom文件导入依赖包:


    org.springframework.kafka
    spring-kafka

(2)配置Kafka,在application.yml中:

server:
  port: 10002
spring:
  kafka:
    bootstrap-servers: 192.168.233.147:9092  #配置kafka服务器地址
    listener:
      ack-mode: manual
      concurrency: 5  #并发数
    consumer:
      enable-auto-commit: false   #false消息为手动签收
      #消息接收者读取没有偏移量或无效时,latest:从最新开始读取消息。earliest:从最新开始读取消息
      auto-offset-reset: earliest
      #配置消息序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)实现消息接收(消费)代码:

@Component
public class ReceiverService { //消息接收(消费)端
    @KafkaListener(groupId = "myGroup", topics = "myTopic")
    public void onMsg(ConsumerRecord record, Acknowledgment ack, Consumer receiver){
    //...处理接收消息后的逻辑
        Object msg = record.value();  //接收到的消息体
        ack.acknowledge();  //手动签收
    }
}

2.消息发送(生产)端:

(1)pom文件导入依赖包:


    org.springframework.kafka
    spring-kafka

(2)配置Kafka,在application.yml中:

server:
  port: 10001
spring:
  kafka:
    bootstrap-servers: 192.168.233.147:9092  #配置kafka服务器地址
    producer:
      retries: 0    #发送失败重试次数
      batch-size: 16384   #批量发送
      buffer-memory: 33554432   #生产者内存为32M
      #配置消息序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #0时生产者在写入消息前不会等待任何服务器的响应
      #1时集群leader节点收到消息时,生产者就会收到服务器的响应
      #-1时分区leader必须等待消息写入到所有ISR副本才认为生产者请求成功。
      acks: 1

(3)实现消息发送(生产)代码:

@Service
public class SenderService {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    public void sendMsg(String topic, String msg){
        ListenableFuture> future = kafkaTemplate.send(topic, msg);
        future.addCallback(new ListenableFutureCallback>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败回调
            }
            @Override
            public void onSuccess(SendResult stringObjectSendResult) {
                //发送成功回调
            }
        });
    }
}

(4)测试消息发送:

@RestController
public class SenderController {
    @Autowired
    private SenderService senderService;
    @GetMapping("/sendMsg")
    public String sendMsg() {
        senderService.sendMsg("myTopic", "发送的消息,time: " + System.currentTimeMillis());
        return "发送消息";
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/748669.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号