栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kakfa从入门到放弃(一): kafka入门

kakfa从入门到放弃(一): kafka入门

文章目录

一、消息队列:

1. 消息队列:2. 消息队列中间:3. 消息队列的应用场景:4. 生产者, 消费者模型:5. 消息队列的两种模式:

5.1 点对点模式5.2 发布订阅模式 二、Kafka简介:

1. 什么是kafka2, 应用场景:3. kafka生态圈 三、kafka环境部署

1. 搭建集群环境 2.目录结构3.onekey脚本

3.1 一键启动3.2 一键关闭 4. docker启动:

一、消息队列: 1. 消息队列:

消息队列, 英文名: Message Queue, 常缩写为: MQ;
消息队列是一种用来存储消息的队列;
很多时候消息队列不是一个永久性的存储, 是作为临时存储的

// 1. 创建一个保存字符串的队列
strQueue := make(chan string, 20)

// 2. 往消息队列里放入消息
strQueue <- "hello"

// 3. 从消息队列里取出数据
fmt.Println(<- strQueue)
2. 消息队列中间:

消息队列中间件就是用来存储消息的软件(组件);
常用的消息中间件: kafka, RabbitMQ, ActiveMQ, RocketMQ, ZeroMQ等;

3. 消息队列的应用场景:

异步处理:

可以将一些比较耗时的操作放在其他系统中, 通过消息队列将需要进行处理的消息进行存储, 其他系统可以消费消息队列中的数据;比较常见: 发送短信, 发送邮件;

系统解耦:

原先一个微服务通过接口(http)调用另一个微服务, 此时耦合很严重, 只要接口发生变化就会导致不可用;使用消息队列可以将系统进行解耦合, 现在第一个微服务可以将消息放到消息队列中, 另一个服务从消息队列中取出数据再进行处理;

流量削峰;

消息队列是低延迟, 高可靠, 高吞吐的, 可以应对大量并发;

日志处理: (大数据领域常见)

用户行为分析:
4. 生产者, 消费者模型:

5. 消息队列的两种模式: 5.1 点对点模式


消息发送者生产消息发送到消息队列中, 然后消息接收者从消息队列中取出来并且消费消息. 消息被消费后, 消息队列不再有存储, 所以消息接收者不可能消费到已经被消费的消息;
点对点模式的特点:

每个消息只有一个接收者(Consumer), 一旦被消费后, 消息就不再在消息队列中;发送者和接收者之间没有依赖, 发送者发送消息后, 不管有没有接收者, 都不会影响到发送者发送下一条消息;接收者在成功接收消息后需向队列应答成功, 一遍消息队列删除当前接收的消息; 5.2 发布订阅模式


发布/订阅模式特点:

每个消息可以有多个订阅者;发布者和订阅者之间有时间上的依赖; 针对某个主题(topic)的订阅者, 它必须创建订阅者之后, 才能消费发布者的消息;为了消费消息, 订阅者需要提前订阅该角色, 并保持在线运行; 二、Kafka简介: 1. 什么是kafka


Kafka是由Apache软件基金会开发的一个开源流平台, 由Scala和java编写;

发布和订阅数据流, 类似于消息队列或者是企业消息传递系统;以为容错的持久化方式存储数据流;处理数据流; – kafka streams 2, 应用场景:

建立实时数据通道, 以可靠地在系统或应用之间获取数据;构建实时流应用程序, 以转换或响应数据流;

其中:Producers: 可以有很多的应用程序, 将消息数据放入到kafka集群中;Consumers: 可以有很多的应用程序, 将消息数据从kafka集群中拉取出来;Connectors: kafka的连接器可以将数据库中的数据导入到kafka, 也可以将kafka的数据导出到数据库中;Stream Processors: 流处理器可以从kafka拉取数据, 也可以将数据写入到kafka;

特性ActiveMQRabbitMQKafkaRocketMQ
所属ApacheMoailla pubilc licenseApacheApache/Ali
成熟度成熟成熟成熟比较成熟
生产者-消费者模式
Request-Reply(请求-响应模式)x
Api完备度低(静态配置)
多语言支持支持java优先语言无关支持, java优先支持
单击吞吐量万级(最差)万级十万级十万级(最多)
消息延迟微秒级毫秒级
可用性高(主从)高(主从)非常高(分布式)
消息丢失理论上不会丢失
消息重复控制理论上不会重复
事务x
文档完备
首次部署难度
3. kafka生态圈

生态圈官网: https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

三、kafka环境部署

kafka版本号: kafka_2.12-2.4.1
其中: 2.12 是scala的版本; 2.4.1 是kafka版本;
发版记录: https://kafka.apache.org/downloads

1. 搭建集群环境

准备kafka安装包

tar -xvzf kafka_2.12-2.4.1.tgz -C ./
cd ./kafka_2.12-2.4.1/

修改server.properties

cd ./kafka_2.12-2.4.1/config
vi server.properties
# 指定broker的id
broker.id=0
# 指定kafka数据的位置
log.dirs=/path/to/kafka/data

将安装好的kafka复制到另外的服务上, 并修改broker.id=n

配置KAFKA_HOME环境变量:
/etc/profile

export KAFKA_HOME=/path/to/kafka/
export PATH=:$PATH:${KAFKA_HOME}

source /etc.portfile

启动服务器:

# 启动zookeeper
nohup bin/zookeeper-server-start.sh config/zookeep.properties &

# 启动kafka
cd /path/to/kafka/
nohup bin/kafka-server-start.sh config/server.properties &

# 测试kafka集群是否启动成功
bin/kafka-topic.sh --bootstrap-server localhost:9092 --list
2.目录结构
目录名称说明
bunkafka的所有执行脚本都在这里; 例如: 启动kafka服务, 创建topic, 生产者, 消费者
config所有配置文件
libs运行kafka所需要的所有jar包
logskafak的所有日志文件
site-docs帮助文档
3.onekey脚本

准备集群节点列表文件
/export/onekey/slave

192.168.100.1
192.168.100.2
192.168.100.3
3.1 一键启动
cat /export/onekey/slave | while read line
do
{
	echo $line
	ssh $line "source /etc/profile; export JMX_PORT=9988; nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 &" &
}&
wait
done
3.2 一键关闭
cat /export/onekey/slave | while read line
do
{
	echo $line
	ssh $line "source /etc/profile;; jps |grep kafka | cut -d ' ' -f1 | xargs kill -s 9"
}&
wait
done
4. docker启动:

相关文档: https://hub.docker.com/r/bitnami/kafka
./docker-compose.yml

version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:latest
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:latest
    ports:
      # - "9092:9092"
      - "9093:9093"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_ConNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758542.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号