栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka 入门

Kafka 入门

Kafka概念

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Kafka名词解释
名词解释
BrokerMQ服务器端,消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
ProducerMQ消息生产者,向Broker服务端发送消息的客户端
ConsumerMQ消息消费者,从Broker服务端读取消息的客户端
Consumer Group消费者组内每个消费者负责消费不同分区的数据,每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition分区存放消息,一个topic可以分为多个partition
Offset标记我们消费者消费的位置
Kafka 命令 创建topic
  • zookeeper:kafka 连接的zookeeper地址
  • replication-factor:用来设置主题的副本数,备份因子
  • partitions 分区数量
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
查看主题
  • bootstrap-server:kafka Broker地址
.kafka-topics.bat --list --bootstrap-server 127.0.0.1:9092
删除主题
  • bootstrap-server:kafka Broker地址
  • topic:topic名称
./kafka-topics.bat --delete --bootstrap-server 127.0.0.1:9092 --topic test
发送消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test 
>send a Msg
消费消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
SpringBoot 集成 maven 依赖
 
    org.springframework.kafka
     spring-kafka
     2.2.4.RELEASE
 
 
     org.apache.kafka
     kafka-clients
     2.1.0
 
消费者
private void send(String key, String data) {
    // topic 名称 key data 消息数据
    kafkaTemplate.send("topicName", key, data);

}
生产者
@KafkaListener(topics = "topicName", groupId = "group1")
public void receive01(ConsumerRecord consumer) {
    log.info(">topic名称:{},,key:{},分区位置:{},offset{},value:{}<",
            consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), consumer.value());
}
配置文件
# kafka
spring:
  kafka:
    # kafka服务器地址(可以多个)
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: group1
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
Offset Reset 三种模式
  • earliest(最早):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest(最新):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none(没有):topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/433972.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号