栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka漫谈(一)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka漫谈(一)

Kafka跟作家kafka的唯一关系在于,开发者喜欢kafka,实际上它是一种高吞吐量的分布式发布订阅消息系统。同样Rocketmq也是一种消息订阅系统,人们经常拿他们进行对比,Rocketmq可以详见该专栏RocketMQ漫谈:从简介到启动(一)。

文章目录
  • 一、安装
  • 二、QA问答
  • 三、整合kafka
  • 四、总结

下文主要围绕以下几点进行展开:

  • kafka能解决什么问题
  • 与Rocketmq对比
  • 常用概念介绍
  • 安装实操环境
一、安装

在介绍概念之前,我们先直接上手安装:

  1. 访问官网进行下载,http://kafka.apache.org/downloads。Tips:若你的机器环境是Windows,则不建议安装最新版本的kafka,会出现各类未知问题,这里使用2.1.1版本进行安装使用。

  1. 解压下载的tgz压缩文件,目录结构如下:
-rw-r--r-- 1 Vainycos 197121    32216 Feb  9  2019 LICENSE
-rw-r--r-- 1 Vainycos 197121      336 Feb  9  2019 NOTICE
drwxr-xr-x 1 Vainycos 197121        0 Feb  9  2019 bin/
drwxr-xr-x 1 Vainycos 197121        0 Feb  9  2019 config/
-rw-r--r-- 1 Vainycos 197121 67317760 Nov 18 14:34 kafka_2.11-2.1.1.tar
drwxr-xr-x 1 Vainycos 197121        0 Feb  9  2019 libs/
drwxr-xr-x 1 Vainycos 197121        0 Nov 18 15:28 logs/
drwxr-xr-x 1 Vainycos 197121        0 Feb  9  2019 site-docs/

主要关注bin目录和conf目录,其中bin目录下的windows文件夹对应windows下的环境;conf目录对应配置文件,采用默认配置暂时不做任何调整。

  1. 开始启动环节

​ 以下命令均在kafka根目录下执行。

  • 启动zk(由于kafka需要依赖zk,使用内置zk启动):启动完成后需保持黑框窗口不关闭

    .binwindowszookeeper-server-start.bat  .configzookeeper.properties
    
  • 启动kafka:新开一个黑框窗口运行,启动完成后需保持黑框窗口不关闭

    .binwindowskafka-server-start.bat .configserver.properties
    
  • 创建一个"Hello-Kafka"主题:新开一个黑框窗口运行,创建完毕后可关闭黑框窗口

    .binwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
    
  • 发送消息(生产者):新开一个黑框窗口运行,启动完成后需保持黑框窗口不关闭

    .binwindowskafka-console-producer.bat --broker-list localhost:9092 --topic Hello-Kafka
    
  • 接收消息(消费者):新开一个黑框窗口运行,启动完成后需保持黑框窗口不关闭

    .binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic Hello-Kafka --from-beginning
    
  1. 测试消息发送

    上面的黑框窗口是发送消息,下面的黑框窗口是接收消息,可以看到消息被成功发送和接收了。

  2. 查看创建的主题

    .binwindowskafka-topics.bat --list --zookeeper localhost:2181
    
二、QA问答

Q:为什么启动kafka前需要先启动zookeeper?

kafka强依赖于zookeeper,没有zookeeper都没办法启动kafka,因为kafka将元数据的管理都丢给了zookeeper进行管理。(目前也开始在做低依赖zk的版本了,即Zookeeper-less Kafka,但是不建议直接在生产环境中使用,让子弹再飞一会儿,详见参考资料)

Q:创建主题Topic的意义?

在了解Topic之前还应该先接触broker。由于kafka支持集群部署,所以每一个部署的节点就是broker,即上面我们启动的一个kafka就是一个broker。而在每一个broker中还需要区分不同的Topic主题,可类比为发送人和接收人,且不同主题的信息互不干扰。

Q:使用Rocketmq还是kafka?

在之前的专栏中我就已经介绍过Rocketmq,也是一款优秀的消息中间件。

Rocketmq早期由阿里开发主导,使用的是java语言,对java生态支持较好,目前属于apache下的开源项目。

kafka也同样隶属于apache下的开源项目,由Scala和Java编写。

kafka经常被拿来与Rocketmq来作比较,论单机性能来说kafka更胜一筹,但是在数据可靠性上Rocketmq更优。

kafka仅能用作日志传输的说法不敢苟同,后期的kafka项目不仅仅满足于日志传输;而Rocketmq是阿里在实际业务需要的产物,自然有它的优点。

结论,没有银弹,视各自的业务需要做选择。

Q:能解决什么问题?

通过上面的测试收发消息,不就是一个简单的发送和接收吗,能解决什么实际需求呢。

同样类比于Rocketmq,kafka这一类的mq中间件其实能解决我们很多业务上的痛点,例如我们在某个业务中用户点击一个按钮进行某个流程周转,其中某一步操作需要给用户发送一条短信,而这一步操作如果需要等待短信发送是否成功将让用户觉得点一个按钮为什么要等待那么久,而我们引入mq则将等待较慢的业务进行解耦,我们只需要发一个消息给mq之后就可以流转其他流程了,因为mq会把产生的消息给消费掉,即发布订阅模型。消费时长决定于消息队列是否忙碌,一般情况下消费时长也挺短的;

还有一种情况是业务系统的高并发情况下,例如秒杀环境,上游订单流量瞬时增加,引入mq这一层即可以将订单消息先存起来,由消费者通过订阅模式进行处理。

以上总结其实就是一句话,生产者生成数据,将数据发送到一个缓存区域,消费者从缓存区域中消费数据。

三、整合kafka

项目示例地址:https://github.com/Vainycos/my_kafka.git(可以clone该仓库进行调试学习,但是更建议参考以下步骤自己动手实现一个)

  • 引入依赖

    
        org.springframework.kafka
        spring-kafka
    
    
  • 配置信息

    # Spring kafka 配置
    # 需先启动zk
    # 需先启动kafka
    spring:
      kafka:
        bootstrap-servers: localhost:9092
        producer:
          # 发生错误后,消息重发的次数。
          retries: 1
          #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
          batch-size: 16384
          # 设置生产者内存缓冲区的大小。
          buffer-memory: 33554432
          # 键的序列化方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 值的序列化方式
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
          # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
          # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
          acks: 1
        consumer:
          # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
          auto-commit-interval: 1S
          # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
          # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
          # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
          auto-offset-reset: earliest
          # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
          enable-auto-commit: false
          # 键的反序列化方式
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # 值的反序列化方式
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        listener:
          # 在侦听器容器中运行的线程数。
          concurrency: 5
          #listner负责ack,每调用一次,就立即commit
          ack-mode: manual_immediate
          missing-topics-fatal: false
    
  • 生产者

    package com.vainycos.mykafka.mq.producer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    
    import javax.annotation.Resource;
    
    
    @Component
    public class MyKafkaProducer {
    
        private Logger logger = LoggerFactory.getLogger(MyKafkaProducer.class);
    
        @Resource
        private KafkaTemplate kafkaTemplate;
    
        
        public static final String TOPIC_INVOICE = "Hello-Kafka";
    
        
        public ListenableFuture> sendMsg(String msg) {
            logger.info("发送MQ消息 topic:{} message:{}", TOPIC_INVOICE, msg);
            return kafkaTemplate.send(TOPIC_INVOICE, msg);
        }
    }
    
  • 消费者

    package com.vainycos.mykafka.mq.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.kafka.support.KafkaHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    import org.springframework.util.Assert;
    
    import java.util.Optional;
    
    
    @Component
    public class KafkaConsumer {
    
        private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
        @KafkaListener(topics = "Hello-Kafka", groupId = "Hello-Kafka")
        public void onMessage(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
            Optional message = Optional.ofNullable(record.value());
            // 1. 判断消息是否存在
            if (!message.isPresent()) {
                return;
            }
            // 2. 处理 MQ 消息
            try {
                Assert.isTrue(true, "消费成功!");
                // 3. 打印日志
                logger.info("消费MQ消息,完成 topic:{} 消费信息:{}", topic, message.get());
    
                // 4. 消息消费完成
                ack.acknowledge();
            } catch (Exception e) {
                // 消息重试,需要保证幂等。
                logger.error("消费MQ消息,失败 topic:{} message:{}", topic, message.get());
                throw e;
            }
        }
    }
    
  • 定时发送测试消息

    package com.vainycos.mykafka;
    
    import com.vainycos.mykafka.mq.producer.MyKafkaProducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    import javax.annotation.Resource;
    import java.time.LocalDateTime;
    
    
    @Component
    public class SendKafkaMsgTask {
    
        private Logger logger = LoggerFactory.getLogger(SendKafkaMsgTask.class);
    
        @Resource
        private MyKafkaProducer kafkaProducer;
    
        
        @Scheduled(cron = "0/5 * * * * ?")
        void sendMsg(){
            // 发送当前时间
            ListenableFuture> future = kafkaProducer.sendMsg(LocalDateTime.now().toString());
            future.addCallback(new ListenableFutureCallback>() {
    
                @Override
                public void onSuccess(SendResult stringObjectSendResult) {
                    // MQ 消息发送成功
                    logger.info("发送MQ消息成功 topic:{}", MyKafkaProducer.TOPIC_INVOICE);
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    // MQ 消息发送失败
                    logger.error("发送MQ消息失败 topic:{}", MyKafkaProducer.TOPIC_INVOICE);
                }
    
            });
        }
    }
    

    我们每隔5秒就用当前时间戳作为消息发送,在黑框控制台里就能看到发送成功的消息:

四、总结

以上我们就实现了一个简单的kafka消息中间件集成作为入门,实际业务过程中我们可以发送具体对象,通过json进行序列化发送,或者实现单独的解码编码直接传输对象。

参考资料:

  • KAFKA Windows环境下的问题- ERROR Failed to write meta.properties
  • Kafka为什么要抛弃ZooKeeper?
  • Kafka 不再需要 ZooKeeper
  • Kafka学习之路 (一)Kafka的简介
  • 技术选型:RocketMQ or Kafka
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582158.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号