栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

KafKa(1)

KafKa(1)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

Kafka一、消息队列

1.概念2.作用

解耦异步削峰使用场景 3.两种模式

3.1.点对点模式3.2 发布/订阅模式 二、mq框架

1.kafka简介2.基本术语3.kafka集群的搭建,启动和关闭

先搭建zookeeper集群搭建kafka 集群 4.集群的启动和关闭5.常用命令参数说明:常见问题


Kafka
一、消息队列 1.概念

消息是在两台计算机之间传递的数据单位,它可以是简单的字符串,也可以是复杂的嵌入对象。消息队列是消息传递过程中保存消息的容器,将消息从源头中继到目标时充当中间人的角色。

2.作用 解耦

A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果
C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统
产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一
条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从
MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统
压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超
时等情况。
就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但 是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。

异步

A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库 要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。
用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应 给用户,总时长是 3 + 5 = 8ms。

削峰

减少高峰时期对服务器压力。 上游系统性能好,压力突然增大,下游系统性能稍差,承受不了突然增大的压力,这时候消息中间件就起到了削峰的作用。

把瞬间的数据通过延迟来虚弱对接口的压力

使用场景

当系统中出现生产和消费的速度和稳定性等因素不一致的时候,使用消息队列,作为中间层,来弥合双方的差异。

3.两种模式 3.1.点对点模式 3.2 发布/订阅模式 二、mq框架

常见的mq框架有:kafka activeMQ rabbitMQ zeroMQ metaMQ rocketMQ等等。。。

1.kafka简介

官网地址
kafka是由apache软件基金会开发的一个开源流处理框架,由JAVA和scala语言编写。是一个高吞吐量的分布式的发布和订阅消息的一个系统。Kafka® 用于构建实时的数据管道和流式的app.它可以水平扩展,高可用,速度快,并且已经运行在数千家公司的生产环境。

2.基本术语

topic(话题): kafka将消息分门别类,每一类的消息称之为话题,是逻辑上的一个概念,如果是真正到磁盘上,映射的是一个partition的一个目录

生产者(producer): 发布消息的对象称之为生产者,只负责数据的产生,生产的来源,可以不在kafka集群上,而是来自其他的业务系统。

消费者(consumer): 订阅消息并处理发布消息的对象,称为消费者。

消费者组consumerGroup: 多个消费者可以构成消费者组,同一个消费者组的消费者,只能消费一个topic数据,不能重复消费。
broker(中间件): kafka本身可以是一个集群,集群中的每一个服务器都是一个代理,这个代理称为broker。只负责消息的存储,不管生产者和消费者没有任何关系。在集群中每个broker有唯一个ID,不能重复。

3.kafka集群的搭建,启动和关闭 先搭建zookeeper集群 搭建kafka 集群

场景 三台虚拟机
cluster1 192.168.106.130
cluster2 192.168.106.131
cluster3 192.168.106.132
卡夫卡
kafka_2.12-2.7.0.tgz

    上传kafka压缩包,到linux系统上

    解压缩: tar -xzvf /root/software/kafka_2.12-2.7.0.tgz -C /usr/

    修改名称: mv /usr/kafka_2.12-2.7.0/ /usr/kafka 方便管理 如果更换版本不在需要改环境变量

    配置环境变量:
    vim /etc/profile

    注意 让配置文件生效 source /etc/profile

    测试 输出 echo $KAFKA_HOME 有无输出

    进入卡夫卡目录 cd /usr/kafka

    创建目录 存放信息的,为后面配置 准备 mkdir logs

    修改配置文件 server.properties文件:
    vim /usr/kafka/config/server.properties
    修改内容:

    #broker的全局唯一编号 不能重复 21 行 borker,id=0#是否邮箱删除topic 22 行 delete.topic.enble=true#处理网络请求和响应的线程数量 42行 num.network.threads=3#用来处理磁盘IO 的线程数量 45 num.iothreads=848 行 #发送套接字的缓冲区大小 socket.send.buffer.bytes=10240051行 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=10240054行 # 请求套接字的最大缓冲区大小 socket.request.max.bytes=10285760060行 #kafka运行日志存放的路径 log.dirs=/usr/kafka/logs65 行 #topic在当前broker上的分区个数 num.partitions=169 行 #用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1#以下配置控制日志段的处理。可以将策略设置为在一段时间后或在给定大小累积后删除段。只要满足这些条件中的任一项,就会删除段。删除总是从日志的末尾开始。103 行 #segment文件保留的最长时间,超时将被删除,单位小时,默认是168小时,也就是7天 log.retention.hours=168#基于大小的日志保留策略。除非剩余的段低于log.retention.bytes,否则将从日志中删除段。独立于log.retention.hours的功能 #log.retention.bytes=10737:41824#日志段文件的最大大小。当达到此大小时,将创建一个新的日志段。log.segment.bytes=1073741824#检查日志段以查看是否可以根据保留策略删除日志段的间隔 log.retention.check.interval.ms=300000123 行 #配置连接Zookeeper集群地址zookeeper.connect=cluster1:2181,cluster2:2181,cluster3:2181
    因为配置文件中使用的zk主机名称链接,所以配置本地域名:vim /etc/hosts

    修改producer.properties: vim config/producer.properties修改21行为:
    bootstrap.servers=cluster1:9092:cluster2:9092cluster3:9092

    修改consumer.properties:

    发送配置好的kafka到另外两台机子(先做免密登录):
    ssh-keygen -t rsa
    ssh-copy-id kafka2
    ssh-copy-id kafka3

scp -r kafka/ kafka2:/usr/
scp -r kafka/ kafka3:/usr/
ls /usr
修改broker.id(切记)在kafka2和kafka3上修改broker.id
vim config/server.properties
修改21行为
分别修改 1 和2
broker.id=1 broker.id=2

    在all session执行:
    source /etc/profile
    echo $KAFKA_HOME
    发送hosts配置文件:
    scp -r /etc/hosts cluster2:/etc/
    scp -r /etc/hosts cluster3:/etc/测试是否成功: 在all session执行:
    ping kakfa1
4.集群的启动和关闭

启动kafka之前一定要保证zk在启动,并且可用:
启动zk:
启动kafka:

5.常用命令

查看当前服务器中的所有topic主题:

kafka-topics.sh --zookeeper cluster1:2181 --list
如果是zk集群可以使用这样的命令:
kafka-topics.sh --zookeeper cluster1:2181,cluster2:2181,cluster3:2181 --list
创建topic:
kafka-topics.sh --zookeeper cluster2:2181 --create --replication-factor 2 --partitions 3 --topic goodstopic

参数说明:

–zookeeper 链接zk
–replication-factor 指定副本数目(副本数目不能大于总的brokers数目)
–partitions 指定分区数
–topic 指定topic名称
删除topic:
kafka-topics.sh --zookeeper cluster1:2181 --delete --topic ordertopic
生产消息:
kafka-console-consumer.sh --bootstrap-server cluster1:90
92 --from-beginning --topic goodstopic
消费消息:
kafka-console-consumer.sh --bootstrap-server cluster1:90
92 --from-beginning --topic goodstopic
查看一个topic详情
kafka-topics.sh --zookeeper cluster1:2181 --describe --topic goodstopic

partitioncount 分区总数量
replicationfactor 副本数量
partition 分区
leader 每个分区有3个副本,每个副本都有leader
replicas 所有副本节点,不管leader follower
isr: 正在服务中的节点

常见问题

常见面试题

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/707979.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号