一、消息队列:
1. 消息队列:2. 消息队列中间:3. 消息队列的应用场景:4. 生产者, 消费者模型:5. 消息队列的两种模式:
5.1 点对点模式5.2 发布订阅模式 二、Kafka简介:
1. 什么是kafka2, 应用场景:3. kafka生态圈 三、kafka环境部署
1. 搭建集群环境 2.目录结构3.onekey脚本
3.1 一键启动3.2 一键关闭 4. docker启动:
一、消息队列: 1. 消息队列:消息队列, 英文名: Message Queue, 常缩写为: MQ;
消息队列是一种用来存储消息的队列;
很多时候消息队列不是一个永久性的存储, 是作为临时存储的
// 1. 创建一个保存字符串的队列 strQueue := make(chan string, 20) // 2. 往消息队列里放入消息 strQueue <- "hello" // 3. 从消息队列里取出数据 fmt.Println(<- strQueue)2. 消息队列中间:
消息队列中间件就是用来存储消息的软件(组件);
常用的消息中间件: kafka, RabbitMQ, ActiveMQ, RocketMQ, ZeroMQ等;
异步处理:
可以将一些比较耗时的操作放在其他系统中, 通过消息队列将需要进行处理的消息进行存储, 其他系统可以消费消息队列中的数据;比较常见: 发送短信, 发送邮件;
系统解耦:
原先一个微服务通过接口(http)调用另一个微服务, 此时耦合很严重, 只要接口发生变化就会导致不可用;使用消息队列可以将系统进行解耦合, 现在第一个微服务可以将消息放到消息队列中, 另一个服务从消息队列中取出数据再进行处理;
流量削峰;
消息队列是低延迟, 高可靠, 高吞吐的, 可以应对大量并发;
日志处理: (大数据领域常见)
用户行为分析:
4. 生产者, 消费者模型:
消息发送者生产消息发送到消息队列中, 然后消息接收者从消息队列中取出来并且消费消息. 消息被消费后, 消息队列不再有存储, 所以消息接收者不可能消费到已经被消费的消息;
点对点模式的特点:
每个消息只有一个接收者(Consumer), 一旦被消费后, 消息就不再在消息队列中;发送者和接收者之间没有依赖, 发送者发送消息后, 不管有没有接收者, 都不会影响到发送者发送下一条消息;接收者在成功接收消息后需向队列应答成功, 一遍消息队列删除当前接收的消息; 5.2 发布订阅模式
发布/订阅模式特点:
每个消息可以有多个订阅者;发布者和订阅者之间有时间上的依赖; 针对某个主题(topic)的订阅者, 它必须创建订阅者之后, 才能消费发布者的消息;为了消费消息, 订阅者需要提前订阅该角色, 并保持在线运行; 二、Kafka简介: 1. 什么是kafka
Kafka是由Apache软件基金会开发的一个开源流平台, 由Scala和java编写;
发布和订阅数据流, 类似于消息队列或者是企业消息传递系统;以为容错的持久化方式存储数据流;处理数据流; – kafka streams 2, 应用场景:
建立实时数据通道, 以可靠地在系统或应用之间获取数据;构建实时流应用程序, 以转换或响应数据流;
其中:Producers: 可以有很多的应用程序, 将消息数据放入到kafka集群中;Consumers: 可以有很多的应用程序, 将消息数据从kafka集群中拉取出来;Connectors: kafka的连接器可以将数据库中的数据导入到kafka, 也可以将kafka的数据导出到数据库中;Stream Processors: 流处理器可以从kafka拉取数据, 也可以将数据写入到kafka;
| 特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|---|
| 所属 | Apache | Moailla pubilc license | Apache | Apache/Ali |
| 成熟度 | 成熟 | 成熟 | 成熟 | 比较成熟 |
| 生产者-消费者模式 | √ | √ | √ | √ |
| Request-Reply(请求-响应模式) | √ | √ | x | √ |
| Api完备度 | 高 | 高 | 高 | 低(静态配置) |
| 多语言支持 | 支持java优先 | 语言无关 | 支持, java优先 | 支持 |
| 单击吞吐量 | 万级(最差) | 万级 | 十万级 | 十万级(最多) |
| 消息延迟 | 微秒级 | 毫秒级 | ||
| 可用性 | 高(主从)高(主从) | 非常高(分布式) | 高 | |
| 消息丢失 | 低 | 理论上不会丢失 | ||
| 消息重复 | 控制 | 理论上不会重复 | ||
| 事务 | √ | x | √ | √ |
| 文档完备 | 高 | 高 | 高 | 中 |
| 首次部署难度 | 低 | 中 | 高 |
生态圈官网: https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem
三、kafka环境部署kafka版本号: kafka_2.12-2.4.1
其中: 2.12 是scala的版本; 2.4.1 是kafka版本;
发版记录: https://kafka.apache.org/downloads
准备kafka安装包
tar -xvzf kafka_2.12-2.4.1.tgz -C ./ cd ./kafka_2.12-2.4.1/
修改server.properties
cd ./kafka_2.12-2.4.1/config vi server.properties
# 指定broker的id broker.id=0 # 指定kafka数据的位置 log.dirs=/path/to/kafka/data
将安装好的kafka复制到另外的服务上, 并修改broker.id=n
配置KAFKA_HOME环境变量:
/etc/profile
export KAFKA_HOME=/path/to/kafka/
export PATH=:$PATH:${KAFKA_HOME}
source /etc.portfile
启动服务器:
# 启动zookeeper nohup bin/zookeeper-server-start.sh config/zookeep.properties & # 启动kafka cd /path/to/kafka/ nohup bin/kafka-server-start.sh config/server.properties & # 测试kafka集群是否启动成功 bin/kafka-topic.sh --bootstrap-server localhost:9092 --list2.目录结构
| 目录名称 | 说明 |
|---|---|
| bun | kafka的所有执行脚本都在这里; 例如: 启动kafka服务, 创建topic, 生产者, 消费者 |
| config | 所有配置文件 |
| libs | 运行kafka所需要的所有jar包 |
| logs | kafak的所有日志文件 |
| site-docs | 帮助文档 |
准备集群节点列表文件
/export/onekey/slave
192.168.100.1 192.168.100.2 192.168.100.33.1 一键启动
cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile; export JMX_PORT=9988; nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 &" &
}&
wait
done
3.2 一键关闭
cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;; jps |grep kafka | cut -d ' ' -f1 | xargs kill -s 9"
}&
wait
done
4. docker启动:
相关文档: https://hub.docker.com/r/bitnami/kafka
./docker-compose.yml
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:latest
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:latest
ports:
# - "9092:9092"
- "9093:9093"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_ConNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local



