#消息队列Message Queue:用来存储消息的队列/组件 #常见的MQ: 1.kafka 基于JAVA开发 2.RabbitMQ 基于Erlang开发 3.ActiveMQ 4.RocketMQ 5.ZeroMQ ##################应用场景:############################### 1.异步处理:注册用户时需要发送短信邮件,这些就可以使用kafka发送 2.系统解耦 3.流量削峰:抢票、抢购等 4.大数据分析与传递:如日志处理:kafka最实用 5.分布式事务 #模型:生产者、消费者模型、存储消息者 #消息传递两种模式: 1.点对点模式(生产者消费者模式): 一个消息只有一个接收者; 两者没有依赖性; 接收者接收到之后通知消息队列删除当前消息。 2.发布订阅模式(观察者模式): 每个消息多个订阅者;可被消息多次 发布者与订阅者有时间上的依赖性,针对某一个topic,必须有订阅者之后才能消费消息; 为了消费消息,订阅者需要提前订阅topic,并保持在线运行。 #############消息队列协议########### #消息中间件采用的不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、Kafka、OpenMessage 协议 #不用HTTP原因:http请求头和响应报文头比较复杂,对于一个消息而言不需要,一定要追求的是高性能,要简洁快速。 #大部分http请求是短链接,一个请求到响应可能会中断,中断以后就不会进行持久化,造成请求的丢失,无法保证消息和数据的高可靠和稳健运行 #所有中间件技术都是基于tcp/ip协议基础上构建新的协议规范、 #AMQP协议 Erlang语言开发,需要安装Erlang; #RabbitMQ、ActiveMQ 支持 1.支持分布式事务 2.消息的持久化 3.高性能和高可靠消息处理 #MQTT协议 #RabbitMQ、ActiveMQ 支持 1.轻量,结构简单 2.传输快,不支持事务 3.不能持久化 #应用:低带宽和网络不稳定场景 #OpenMessage协议 RocketMQ支持 1.结构简单 2.解析速度快 3.支持事务和持久化设计 #Kafka协议 kafka支持 1.结构简单 2.解析速度快 3.不支持事务 4.可以持久化 #############消息队列持久化######### #持久化的简单理解:将数据存入磁盘,而不是内存中 #############消息分发策略########### 见下图!!! 1.发布订阅:每个消费者都会消费消息 2.轮询分发和公平分发:消息只会发给一个消费者;轮询分发不关心消费者性能,均衡分发 #############消息队列高可用和高可靠######## 高可用:集群 高可靠:消息传输过程不丢失和持久化
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tVQusc0c-1646568038639)(picture/消息队列分发机制.PNG)]
1.kafka#由Apache软件基金会开发的由Scala和java编写,是一种高吞吐量的分布式发布订阅消息系统。不支持事务。 #优点: 1.解耦 2冗余(确保消息被消费) 3 扩展性 4 灵活性&削峰 5可恢复性 6顺序保证 7缓冲 8异步通信 #架构: 1 broker #集群中的服务器节点 2.topic #消息主题,可能在不同的broker上,但是使用者无需关心;一般有多个partition 3.Partition #分区 一个topic至少有一个;每个partition中使用多个segment文件存储;路由规则:指定了一个则直接是;未指定则根据指定的key的hash值得出指定一个;若都没有则轮训选出一个;和leader一一对应,且都有本地备份 4.leader #每个partition有多个副本,仅有一个为主,当前负责数据读写的partition;producer先从zk的/brokers/.../state节点找到leader,将消息发给leader,leader写入本地log,follower从leader中pull消息,写入本地log后leader发送ACK,leader收到所有的ISR中replica的ACK后增加HW(high watermark最后commit的offset)并向producer发送ACK ====负责读写数据===== 5.follower #一个主伴随着多个从 ,所有的写请求都通过leader路由,数据变更会广播给所有follower,如果leader失效,follower会选出一个新的leader;当follower宕机,leader会把这个follower从in syncreplicas(ISR)中删除并重新创建一个follower,====follower只负责备份==== 6.replicas #partition的备份,i个partition,n个broker,j个replica:则replica备份到(i+j)/n节点上 7.producer #生产者 8.consumer #消费者 9.consumer group #消费者组:如果消费者不分配则自动属于默认的group;这个消费者组共享一组偏移量,防止数据重复消费,因为一个topic有多个分区 10.offset 偏移量 #每条消息都有一个自增的编号,用于标识消息,决定读取数据的位置,消息被消费后并不会被马上删除,默认生命周期为1周。即不同的消费者组可以重复消费一条消息 11.zookeeper #注册中心,存储集群的meta信息 #关系: kafka--->多个topic--->多个partition--->多个repalicas副本(partition内容) topic与broker是多对多关系 topic对应的映射关系存储在zk的节点上1.1集群搭建
#kafka集群是必须要依赖zookeeper的 注意: 1.下载解压kafka安装包,搭建集群每个kafka节点都需要修改broker.id(节点标识,不可重复) 2.需要配置log.dir目录,zk地址、端口 3.修改环境变量 /etc/profile文件中添加export ... 然后设置文件生效 source /etc/profile 4.启动集群 kafka-server-start.sh -daemon /.../../server.properties1.2常用命令与压测命令
#命令行使用(kafka Tool工具也可以直接操作,也可看一些kafka信息,还可以看zookeeper的一些信息) #指定JMX端口启动可以方便监控kafka集群 JMX_PORT=9991 kafka-server-start.sh -daemon /.../../server.properties 1.创建一个test名称的topic主题 副本数为2,分区数为3 #2.2版本之前用 --zookeeper node1:2081,node2:2181,2.2之后可用--bootstrap-server代替--zookeeper只写一个ip效果一样 bin/kafka-topics.sh --create --bootstrap-server ip:9092,ip2:9092 --partitions 3 --replication-factor 2 --topic test 2.查看目前有的主题 bin/kafka-topics.sh --list --bootstrap-server ip:9092 3.查看指定的topic(删除指定的topic) bin/kafka-topics.sh --describe(--delete) --bootstrap-server ip:9092,ip2:9092 --topic test 4.增加topic的partition数 3个 bin/kafka-topics.sh --bootstrap-server ip:9092 --alter --topic test --partitions 3 5.生产消息到kafka bin/kafka-console-producer.sh --broker-list ip:9092 --topic test(回车输入值即可添加消息) 6.消费消息 bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test --from-beginning(从头拉取) #从尾部开始,需指定分区 bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test --offset latest --partition 0 --max-messages 1 #取指定个数 7.指定消费者组 bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test --group testGroup --from-beginning 8.查看group详情 kafka-consumer-groups.sh --bootstrap-server ip:9092 --group test_group --describe TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 5 5 0 - - - # CURRENT-OFFSET: 当前消费者群组最近提交的 offset,也就是消费者分区里读取的当前位置 # LOG-END-OFFSET: 当前最高水位偏移量,也就是最近一个读取消息的偏移量,同时也是最近一个提交到集群的偏移量 # LAG:消费者的 CURRENT-OFFSET 与 broker 的 LOG-END-OFFSET 之间的差距 即消息的堆积量 9.查看group列表 kafka-consumer-groups.sh --bootstrap-server ip:9092 --list 10.删除group kafka-consumer-groups.sh --bootstrap-server ip:9092 --group testGroup --delete 9.消费者组中的所有活动成员 ./kafka-consumer-groups.sh --bootstrap-server ip:9098 --describe --group group1 --members #基准测试benchmark testing 1.创建topic(一个分区一个副本) tName topic名称 bin/kafka-topics.sh --zookeeper ip:2181 --create --topic tName --partitions 1 --replication-factor 1 2.生产消息测试 bin/kafka-producer-perf-test.sh --topic tName --num-records 5000000 --throughput -1 --rocord-size 1000 --producer-props bootstrap.servers=ip1:9092,ip2:9092... acks=1 --topic topic名字 --num-records 总共指定的生产数据默认5000W --throughput 指定吞吐量 -1表示不指定(recors/sec) --rocord-size record数据大小(字节) --producer-props bootstrap.servers=ip1:9092,ip2:9092... acks=1 指定集群地址ack模式 3.消费者测试(一般比生产者快) bin/kafka-consumer-perf-test.sh --broker-list ip1:9092,ip2:9092... --topic tName --fetch-size 100000 --messages 5000000 --fetch-size 每次拉取数据的大小 --messages 总共消费的个数1.3kafka数据检索机制
1.topic在物理层以partition为分组,一个topic分为多个partition 2.partition为多个Segment组成segment参数有两个: log.segment.bytes:数据容量,默认1GB log.segment.ms:commit未写满的segment前,所等待的时间默认7天、 3.logSegment文件由两部分组成,分别为.index和.log文件即索引文件和数据文件: partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment晚间最后一条消息的offset值 数值大小为64位,20位数字字符长度,没有数字用0填充例如:第一个segment:0000000000000000000.index;第二个为0000000000000012345.index 4.消息都有固定的物理结构:offset(8Bytes)、消息体大小(4Bytes)、crc32(4Bytes)、magic(1Byte)、attributes(1Byte)、key lenth(4Bytes)、payload(N bytes)等可以确定一条消息的大小1.4.数据安全性
#生产者方面:producer delivery guarantee 0:At least one 消息不会丢,但是可能会重复 1:At most once 消息可能会丢,但是绝对不会重复 2:Exactly once 每条消息肯定会被传输一次且仅一次 #producers 可以选择是否为数据的写入接收ack,request.required.acks acks=0:producer在ISR中的leader已成功接收并得到确认后发送下一条message acks=1:producer无需等待来自broker的确认而继续发送下一批消息 acks=all:producer需要等待ISR中所有的follower都确认收到消息才算发送完成,可靠性最高 ##ISR机制 #关键词 1.AR assigned replicas 用来标识副本全集 2.OSR out-sync replicas 离开同步队列的副本 3.ISR in-sync replicas 加入同步队列的副本 4.ISR = leader+没有落后太多的副本;AR = OSR+ISR #ISR选举机制 1.当主节点挂掉,并不是去follower中选择主,而是从ISR中选择,ISR为空,则从follower中选取 2.判断标准: 1)超过10S没有同步数量 replica.lag.time.max.ms = 10000 2)主副节点差4000条数据 replica.lag.max.messages = 4000 3)脏节点选择 kafka采用一种降级措施来处理 选举第一个恢复的node作为leader,以它的数据为基准(脏leader选举) ##Broker数据存储机制 1.无论消息是否被消费,kafka都会保留所有消息,有两种策略可以删除旧数据 1)基于时间 log.retention.hours = 168 2)基于大小 log.retention.bytes = 1073741824 ##消费者方面:consumer delivery guarantee 1.如果将consumer设置为autocommit,consumer一旦督导数据立即自动commit,如果只讨论这一读取消息的过程,那kafka确保了exactly once 2.读完消息先commit再处理消息:如果consumer再commit之后没来得及处理就crash了,下次重新开始工作后无法读到刚刚已提交而未处理的消息 对应 At most once 3.读完消息先处理再commit,如果commit之前consumer 崩溃,下次重新工作还会再处理刚刚未提交的消息,对应 at least once 4.如果一定要做到exactly once 就需要协调offset和实际操作的输出:经典的做法是引入两阶段提交 #kafka 默认保证at least once 并且允许通过设置producer异步提交来实现at most once ##数据的消费 1.partition_num=2,启动一个consumer进程订阅这个topic,对应的stream_num设置为2,也就是说启动两个线程处理message 2.如果auto.commit.enable=true: 当consumer fetch了一些数据还没有完全处理掉的时候,刚好到commit interval触发了offset操作,接着consumer crash了,这时已经fetch的数据还没处理完但已经commit,因此没有机会再被处理,数据丢失。 3.如果auto.commit.enable=false: 假设consumer的连个fetcher各自拿了一条数据,并且有两个线程同时处理,这时线程t1处理完partition1的数据,手动提交offset(consumer对占有的partition进行commit,kafka没有提供更细粒度的方式),如果t2没有处理完数据并crash了,t2的这条数据也就丢失了 #解决办法: 1.手动commit offset,并针对partition_num启同样数目的consumer进程,这样就能保证一个consumer进程占用一个partition,commit offset的时候不会影响其他partition的offset,具有局限性 2.手动commit offset,另外再consumer端再将所有fetch到的数据缓存到queue里,当把queue里所有的数据处理完之后,再批量提交offset,这样就能保证处理完的数据才被commit1.5.JavaApi
//导入相应包Springbootorg.apache.kafka kafka-clients 2.8.0 org.apache.commons commons-io 1.3.2
spring:
kafka:
bootstrap-servers:127.0.0.1:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
package com.example.demo.kafka;
//消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)
public void topic_test(ConsumerRecord, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP2)
public void topic_test1(ConsumerRecord, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test1 消费了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
}
//生产者
package com.example.demo.kafka;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Component
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
//自定义topic
public static final String TOPIC_TEST = "topic.test";
public static final String TOPIC_GROUP1 = "topic.group1";
public static final String TOPIC_GROUP2 = "topic.group2";
public void send(Object obj) {
String obj2String = JSONObject.toJSONString(obj);
log.info("准备发送消息为:{}", obj2String);
//发送消息
ListenableFuture> future = kafkaTemplate.send(TOPIC_TEST, obj);
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult stringObjectSendResult) {
//成功的处理
log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
}
}
1.6.kafka配置文件参数
####################BROKER 的全局配置############################ #最为核心的三个配置 broker.id、log.dir、zookeeper.connect 。 ##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers broker.id =1 ##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2 log.dirs = /tmp/kafka-logs ##提供给客户端响应的端口 port =6667 ##消息体的最大大小,单位是字节 message.max.bytes =1000000 ## broker 处理消息的最大线程数,一般情况下不需要去修改 num.network.threads =3 ## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数 num.io.threads =8 ## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改 background.threads =4 ## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制 queued.max.requests =500 ##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置 host.name ## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究 advertised.host.name ## 广告地址端口,必须不同于port中的设置 advertised.port ## socket的发送缓冲区,socket的调优参数SO_SNDBUFF socket.send.buffer.bytes =100*1024 ## socket的接受缓冲区,socket的调优参数SO_RCVBUFF socket.receive.buffer.bytes =100*1024 ## socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖 socket.request.max.bytes =100*1024*1024 #########################################LOG 相关 ############################## ## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖 log.segment.bytes =1024*1024*1024 ## 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖 log.roll.hours =24*7 ## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖 log.cleanup.policy = delete ## 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据 ## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖 log.retention.minutes=7days ##指定日志每隔多久检查看是否可以被删除,默认1分钟 log.cleanup.interval.mins=1 ## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes 。-1没有大小限制 ## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖 log.retention.bytes=-1 ## 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略 log.retention.check.interval.ms=5minutes ## 是否开启日志压缩 log.cleaner.enable=false ## 日志压缩运行的线程数 log.cleaner.threads =1 ## 日志压缩时候处理的最大大小 log.cleaner.io.max.bytes.per.second=None ## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好 log.cleaner.dedupe.buffer.size=500*1024*1024 ## 日志清理时候用到的IO块大小 一般不需要修改 log.cleaner.io.buffer.size=512*1024 ## 日志清理中hash表的扩大因子 一般不需要修改 log.cleaner.io.buffer.load.factor =0.9 ## 检查是否处罚日志清理的间隔 log.cleaner.backoff.ms =15000 ## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖 log.cleaner.min.cleanable.ratio=0.5 ## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖 log.cleaner.delete.retention.ms =1day ## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖 log.index.size.max.bytes =10*1024*1024 ## 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数 log.index.interval.bytes =4096 ## log文件"sync"到磁盘之前累积的消息条数 ## 因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段 ## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡. ## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞) ## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟. ## 物理server故障,将会导致没有fsync的消息丢失. log.flush.interval.messages=None ## 检查是否需要固化到硬盘的时间间隔 log.flush.scheduler.interval.ms =3000 ## 仅仅通过interval来控制消息的磁盘写入时机,是不足的. ## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔 ## 达到阀值,也将触发. log.flush.interval.ms = None ## 文件在索引中清除后保留的时间 一般不需要去修改 log.delete.delay.ms =60000 ## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改 log.flush.offset.checkpoint.interval.ms =60000 ####################TOPIC 相关 ######################## ## 是否允许自动创建topic ,若是false,就需要通过命令创建topic auto.create.topics.enable =true ## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数 default.replication.factor =1 ## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖 num.partitions =1 实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。 #############################复制(Leader、replicas) 相关 ########################## ## partition leader与replicas之间通讯时,socket的超时时间 controller.socket.timeout.ms =30000 ## partition leader与replicas数据同步时,消息的队列尺寸 controller.message.queue.size=10 ## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中 replica.lag.time.max.ms =10000 ## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效 ## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后 ## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移 ## 到其他follower中. ## 在broker数量较少,或者网络不足的环境中,建议提高此值. replica.lag.max.messages =4000 ##follower与leader之间的socket超时时间 replica.socket.timeout.ms=30*1000 ## leader复制时候的socket缓存大小 replica.socket.receive.buffer.bytes=64*1024 ## replicas每次获取数据的最大大小 replica.fetch.max.bytes =1024*1024 ## replicas同leader之间通信的最大等待时间,失败了会重试 replica.fetch.wait.max.ms =500 ## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件 replica.fetch.min.bytes =1 ## leader 进行复制的线程数,增大这个数值会增加follower的IO num.replica.fetchers=1 ## 每个replica检查是否将最高水位进行固化的频率 replica.high.watermark.checkpoint.interval.ms =5000 ## 是允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker controlled.shutdown.enable =false ## 控制器关闭的尝试次数 controlled.shutdown.max.retries =3 ## 每次关闭尝试的时间间隔 controlled.shutdown.retry.backoff.ms =5000 ## 是否自动平衡broker之间的分配策略 auto.leader.rebalance.enable =false ## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡 leader.imbalance.per.broker.percentage =10 ## 检查leader是否不平衡的时间间隔 leader.imbalance.check.interval.seconds =300 ## 客户端保留offset信息的最大空间大小 offset.metadata.max.bytes ######################################ZooKeeper 相关############################# ##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3 zookeeper.connect = localhost:2181 ## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大 zookeeper.session.timeout.ms=6000 ## ZooKeeper的连接超时时间 zookeeper.connection.timeout.ms =6000 ## ZooKeeper集群中leader和follower之间的同步时间 zookeeper.sync.time.ms =2000 配置的修改 其中一部分配置是可以被每个topic自身的配置所代替,例如 新增配置 bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1 修改配置 bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000 删除配置 : bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes #######################ConSUMER 配置######################## #最为核心的配置是group.id、zookeeper.connect ## Consumer归属的组ID,broker是根据group.id来判断是队列模式还是发布订阅模式,非常重要 group.id ## 消费者的ID,若是没有设置的话,会自增 consumer.id ## 一个用于跟踪调查的ID ,最好同group.id相同 client.id = group id value ## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置 zookeeper.connect=localhost:2182 ## zookeeper的心跳超时时间,查过这个时间就认为是dead消费者 zookeeper.session.timeout.ms =6000 ## zookeeper的等待连接时间 zookeeper.connection.timeout.ms =6000 ## zookeeper的follower同leader的同步时间 zookeeper.sync.time.ms =2000 ## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常 auto.offset.reset = largest ## socket的超时时间,实际的超时时间是:max.fetch.wait + socket.timeout.ms. socket.timeout.ms=30*1000 ## socket的接受缓存空间大小 socket.receive.buffer.bytes=64*1024 ##从每个分区获取的消息大小限制 fetch.message.max.bytes =1024*1024 ## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset auto.commit.enable =true ## 自动提交的时间间隔 auto.commit.interval.ms =60*1000 ## 用来处理消费消息的块,每个块可以等同于fetch.message.max.bytes中数值 queued.max.message.chunks =10 ## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 ## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 ##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, ## 此值用于控制,注册节点的重试次数. rebalance.max.retries =4 ## 每次再平衡的时间间隔 rebalance.backoff.ms =2000 ## 每次重新选举leader的时间 refresh.leader.backoff.ms ## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求 fetch.min.bytes =1 ## 若是不满足最小大小(fetch.min.bytes)的话,等待消费端请求的最长等待时间 fetch.wait.max.ms =100 ## 指定时间内没有消息到达就抛出异常,一般不需要改 consumer.timeout.ms = -1 ############################# PRODUCER 的配置########################## 比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class ## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip metadata.broker.list ##消息的确认模式 ##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP ##1:发送消息,并会等待leader 收到确认后,一定的可靠性 ## -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性 request.required.acks =0 ## 消息发送的最长等待时间 request.timeout.ms =10000 ## socket的缓存大小 send.buffer.bytes=100*1024 ## key的序列化方式,若是没有设置,同serializer.class key.serializer.class ## 分区的策略,默认是取模 partitioner.class=kafka.producer.DefaultPartitioner ## 消息的压缩模式,默认是none,可以有gzip和snappy compression.codec = none ## 可以针对默写特定的topic进行压缩 compressed.topics=null ## 消息发送失败后的重试次数 message.send.max.retries =3 ## 每次失败后的间隔时间 retry.backoff.ms =100 ## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据 topic.metadata.refresh.interval.ms =600*1000 ## 用户随意指定,但是不能重复,主要用于跟踪记录消息 client.id="" ------------------------------------------- 消息模式 相关 ------------------------- ## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送 producer.type=sync ## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送 queue.buffering.max.ms =5000 ## 异步的模式下 最长等待的消息数 queue.buffering.max.messages =10000 ## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃 queue.enqueue.timeout.ms = -1 ## 异步模式下,每次发送的最大消息数,前提是触发了queue.buffering.max.messages或是queue.buffering.max.ms的限制 batch.num.messages=200 ## 消息体的系列化处理类 ,转化为字节流进行传输 serializer.class= kafka.serializer.DefaultEncoder1.7.kafka参数优化
#其他内容再百度一下 #partition数目 1.调整后影响:一般来说,每个partition能处理吞吐量为几MB/s(需根据硬件具体分析),增加partition数量意味着: 1)更高的并行度和吞吐 2)可以扩展更多的同一个group中的consumers 3)更大程度利用闲置的brokers 但是会造成zk的更多选举 2. 调整原则: 1)一般来说,若集群较小(小于6个broker),则配置2*broker的partition数 2)若集群大于12个,则配置1*broker数 3)考虑最高峰吞吐需要的并行consumer数,若应用场景需要20个同一group的consumer,则设置为20个partition 4)考虑producer所需吞吐,如果吞吐非常高,则增加partition数 #replication factor 此参数决定的是records复制的数目,建议至少为2,一般为3,最高设置为4 1.更高的replication factor意味着: 系统更稳定,允许N-1个broker党纪 更多的副本,如果acks=all,则会有较高延迟 系统磁盘利用率更高 2.调整原则 以3为起始,同时也不建议一个kafka集群中节点数少于3个节点 如果replication的性能成为了瓶颈,则建议使用性能更好的broker,而不是降低RF数目,且生产环境不要设置为1 #批量写入 为了大幅度提高producer写入吞吐量,需要定期批量写文件 每当producer写入10000条数据,写入到磁盘 log.flush.interval.messages=10000 每隔1S中,写入磁盘 log.flush.interval.ms=10001.8.Flume集成kafka
#Flume 是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,通过监控整个文件目录或者某一个特定文件,用于收集数据;同时Flume也 提供数据写到各种数据接受方(可定制)的能力,用于转发数据。Flume 的易用性在于通过读取配置文件,可以自动收集日志文件,在大数据处理及各种复杂的情况下,flume 经常被用来作为数据处理的工具 1.下载flume安装包,安装配置环境变量等 2.修改配置文件2.RabbitMQ 2.1入门与安装
#官网https://www.rabbitmq.com/下载找到Erlang所需版本 #Erlang官网https://www.erlang.org/downloads下载对应的版本 erl -v 显示版本号则安装Erlang成功 #安装socat插件 yum install -y socat才能安装rabbitmq #解压rpm安装包 rpm -Uvh xxxx.rpm #安装rabbitMQ yum install rabbitmq -y #启动/查看状态/停止/设置开机启动 systemctl start/status/stop/enable rabbitmq-server ####Docker拉取镜像的话只需一个命令,详情见dockerhub https://registry.hub.docker.com/_/rabbitmq/ ##rabbitmq为什么使用信道channel #大家都知道,在使用rabbitmq时不管是消费还是生产都需要创建信道(channel) 和connection(连接),如下图producer示例。我们完全可以直接使用Connection就能完成信道的工作,为什么还要引入信道呢,试想这样一个场景,一个应用有多个线程需要从rabbitmq中消费,或是生产消息,那么必然会建立很多个connection ,也就是多个tcp连接,对操作系统而言,建立和销毁tcp连接是很昂贵的开销(TCP的创建和销毁,开销大,创建需要三次握手,销毁需要四次分手),如果遇到使用高峰,性能瓶颈也随之显现,rabbitmq采用类似nio的做法,连接tcp连接复用,不仅可以减少性能开销,同时也便于管理。 #每个线程都把持一个信道,所以信道复用了TCP连接。同时rabbitmq可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的connection可以再产生性能瓶颈的情况下有效地节省tcp连接资源,但是当信道本身的流量很大时,这时候多个信道复用一个connection就会产生性能瓶颈,进而是整体的流量被限制了。此时就需要开辟多个connection,将这些信道均摊到这些connection中,至于这些相关调优策略需要根据业务自身的实际情况进行调节。 #####基本概念########### #a. Message 具体的消息,包含消息头(即附属的配置信息)和消息体(即消息的实体内容) 由发布者,将消息推送到 Exchange,由消费者从 Queue 中获取 #b. Publisher 消息生产者,负责将消息发布到交换器(Exchange) #c. Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列,如果没有指定交换机,会有一个默认的交换机(简单、work模式使用默认的),其他的最好指定上 #d. Binding 绑定,用于给 Exchange 和 Queue 建立关系,从而决定将这个交换器中的哪些消息,发送到对应的 Queue,有多个routing key #e.Routing key 路由规则,虚拟机可以用它来确定如何路由一个特定消息 #e. Queue 消息队列,用来保存消息直到发送给消费者 它是消息的容器,也是消息的终点 一个消息可投入一个或多个队列 消息一直在队列里面,等待消费者连接到这个队列将其取走 #f. Connection 连接,内部持有一些 channel,用于和 queue 打交道 #g. Channel 信道(通道),MQ 与外部打交道都是通过 Channel 来的,发布消息、订阅队列还是接收消息,这些动作都是通过 Channel 完成; 简单来说就是消息通过 Channel 塞进队列或者流出队列 #h. Consumer 消费者,从消息队列中获取消息的主体 #i. Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。 虚拟主机是共享相同的身份认证和加密环境的独立服务器域。 每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制由若干个Exchange和queue vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 可以理解为 db 中的数据库的概念,用于逻辑拆分 #j. server/Broker 消息队列服务器
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3yEy5JwO-1646568038641)(picture/v2-18f6b4006d59344833560d4e5291cc54_r.jpg)]
2.2 web管理页面
#默认无管理页面,安装:默认账户密码是guest,只能在本机访问,需要设置远程访问 rabbitmq-plugins enable rabbitmq_management #安装完毕重启服务,需要开放服务器的15672端口 systemctl restart rabbitmq-server #####新增/删除/查看所有 用户 rabbitmqctl add_user admin admin rabbitmqctl delete_user admin rabbitmqctl list_users rabbitmqctl change_password admin pwd #分配权限 rabbitmqctl set_user_tags admin administrator 1.administrator 可以登录控制台、查看所有信息、对mq进行管理 2.monitoring 监控者 可以登录控制台、查看所有信息 3.policymaker 策略制定者 登录控制台、指定策略 4.management 普通管理员,查看自己相关节点信息 登录控制台 5.none 不可访问2.3简单模式
#一个producer--一个队列--一个consumer
持久化队列会存盘;非持久话队列也会存盘,但是会随服务器重启消失
#代码步骤:
1.创建连接
2.创建通道
3.声明队列:#参数:对列名|是否持久化|排他性|是否自动删除|附属参数
#排他性:只能是本连接可见,连接断开时自动删除
#是否自动删除:随着最后一个消费者消费完毕消息是否把队列删除
channel.queueDeclare('对列名',true,false,false,null)
4.准备消息,然后生产者发送给队列
#没指定交换机的时候会默认指定一个交换机
#参数:
exchange:交换机
routingKey:路由键,fanout模式没有,直接写的是队列名 #匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
mandatory:为true时如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用
basic.return方法将消息返还给生产者。为false时出现上述情形broker会直接将消息扔掉
immediate:为true时如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
props:需要注意的是BasicProperties.deliveryMode,1:不持久化 2:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
body:要发送的信息
#channel.basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
4.1 消费者消费消息Basic.consume
5.关闭通道、关闭连接
2.4 Fanout模式
#简单发布订阅模式 producer--》交换机--》多个队列---》多个consumer(无routing-key,指定也无效)
代码步骤在上面的基础上:会在发送给队列消息之前指定交换机,
channel.exchangeDeclare(exchange,"fanout");#声明交换机,并指定类型;
然后绑定队列和交换机
channel.queueBind('队列名','交换机名','routingkey');
#指定交换机、指定队列、绑定也可以手动在图形化先指定好,或者命令
2.5 Direct模式
#路由模式/指定routing-key的发布订阅模式 #实际使用中较多 producer--》交换机--不同的队列是同一个或不同路由(如/info /error)-->多个队列-->多个consumer 代码步骤在上面的基础上:会在发送给队列消息之前,指定交换机再指定路由key
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KAcbDkMR-1646568038642)(picture/144712_rWkA_3698467.png)]
2.6 Topic模式#支持模糊匹配的routing-key的发布订阅模式 producer--》交换机--不同的队列是同一个或不同路由-->多个队列-->多个consumer # *(星号)表示一个单词 # #(井号)表示零个或者多个单词 #如果消费者端的路由关键字只使用【#】来匹配消息,在匹配【topic】模式下,它会变成一个分发【fanout】模式,接收所有消息。 #如果消费者端的路由关键字中没有【#】或者【*】,它就变成直连【direct】模式来工作。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mKHvitOc-1646568038642)(picture/20191029173633425.png)]
2.7 Headers模式#根据headers找队列的发布订阅模式 #headers exchange与 direct、topic、fanout不同,它是通过匹配 AMQP 协议消息的 header 而非路由键,有点像HTTP的Headers;headers exchange与 direct Exchange类似,性能方面比后者查很多,所以在实际项目中用的很少。 在绑定Queue与Exchange时指定一组header键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式), 对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对。如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。 headers属性是一个键值对,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。 键值对要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的header是需要全部匹配(all),还是仅匹配一个键(any)就可以了。
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.bs.utils.ConnectionUtil;
public class Producer {
private final static String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
//1、获取连接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
//2、声明信道
Channel channel = connection.createChannel();
//3、声明交换器,类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//4、创建消息
String message = "hello rabbitmq";
//5、发布消息
channel.basicPublish(EXCHANGE_NAME, "add", null, message.getBytes());
System.out.println("生产者发送" + message + "'");
//6、关闭通道
channel.close();
//7、关闭连接
connection.close();
}
}
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.bs.utils.ConnectionUtil;
public class Consumer1 {
private final static String QUEUE_NAME = "direct_queue_1";
private final static String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception{
//1、获取连接
Connection connection = ConnectionUtil.getConnection("localhost",5672,"/","guest","guest");
//2、声明通道
Channel channel = connection.createChannel();
//3、声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4、绑定队列到交换机,指定路由key为update
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"add");
//同一时刻服务器只会发送一条消息给消费者,设置为手动应答的时候也必须有此参数
channel.basicQos(1);//根据服务器性能设置值
//5、定义队列的消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6、监听队列,手动返回完成状态 false代表关闭手动应答,此时为公平分发模式
channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
//为true时表示自动应答,无须下面的channel.basicAck(),此时为轮询模式
//channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
//7、获取消息
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 消费者1:" + message + "'");
//消费者1接收一条消息后休眠10毫秒
Thread.sleep(10);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
//为什么要手动应答???????
//答:自动应答为轮询分发模式,手动应答是公平分发模式,会根据服务器性能来
}
}
2.8 work queues模式
#两种模式:轮询模式和公平分发模式 #实际使用中较多 轮询是指 一个消费者一条,按均分配 公平分发是指根据消费者的消费能力进行公平分发,处理快的分发的多。 # 具体的代码区别 见上面消费者。2.9 RabbitMQ使用场景
#解耦、削峰、异步 #异步:使用线程池多线程去进行异步操作,但是需要维护线程池、持久性、高可靠等,且具有高耦合性2.10 Springboot集成rabbitmq
org.springframework.boot spring-boot-starter-amqp
server:
port: 9000
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: 192.168.1.45
port: 5672
username: admin
password: admin
#virtual-host: /
# ------- 消息确认配置项 --------
# 确认消息已发送到交换机
#publisher-/confirm/is: true
# 确认消息已发送队列
#publisher-returns: true
#高版本报错可能得使用: publisher-/confirm/i-type: correlated
//消费消息注解
@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
@RabbitHandler
public void processMessage1(String message) {
System.out.println(message);
}
@RabbitHandler
public void processMessage2(byte[] message) {
System.out.println(new String(message));
}
}
//通过 @RabbitListener 的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、RouteKey 则自动创建,若存在则抛出异常)
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
value = @Queue(value = "consumer_queue",durable = "true"),
key = "key.#"
))
//使用这个注解绑定的话就不需要配置类去绑定交换机、队列及其关系.建议用配置类
DirectExchange配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
// 配置Jackson2JsonMessageConverter来支持消息内容JSON序列化与反序列化
@Bean
public RabbitListenerContainerFactory> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public DirectExchange myDirectExchange() {
// 参数意义:
// name: 名称
// durable: true
// autoDelete: 自动删除
return new DirectExchange("myDirectExchange", true, false);
}
@Bean
public Queue myDirectQueue() {
return new Queue("myDirectQueue", true);
}
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(myDirectQueue())
.to(myDirectExchange())
.with("my.direct.routing");
}
}
//模拟发送消息类
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String send() {
String msg = "hello";
//参数:交换机、路由key、具体消息
rabbitTemplate.convertAndSend("exchange-direct", "my.direct.routing", msg);
return "success";
}
//消费消息类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
//@RabbitListener(queues = "myDirectQueue")
public class DirectReceiver{
@RabbitHandler
@RabbitListener(queues = "myDirectQueue")
public void process(String msg) {
System.out.println(msg);
}
}
FanoutExchange
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
// ----- 交换机 -----
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange", true, false);
}
// ----- 队列 -----
@Bean
public Queue fanoutQueueA() {
return new Queue("fanoutQueueA", true);
}
@Bean
public Queue fanoutQueueB() {
return new Queue("fanoutQueueB", true);
}
@Bean
public Queue fanoutQueueC() {
return new Queue("fanoutQueueC", true);
}
// ----- 绑定交换机与队列 -----
@Bean
public Binding bindingFanoutA() {
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutB() {
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutC() {
return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
}
}
//发送消息类
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendByFanout")
public String sendByFanout() {
String msg = "hello fanout";
rabbitTemplate.convertAndSend("fanoutExchange", null, msg);
return "success";
}
//消息处理类
package com.rabbitmq.demo.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectReceiver{
@RabbitHandler
@RabbitListener(queues = "fanoutQueueA")
public void processA(String msg) {
System.out.println("fanoutQueueA " + msg);
}
@RabbitHandler
@RabbitListener(queues = "fanoutQueueB")
public void processB(String msg) {
System.out.println("fanoutQueueB " + msg);
}
@RabbitHandler
@RabbitListener(queues = "fanoutQueueC")
public void processC(String msg) {
System.out.println("fanoutQueueC " + msg);
}
}
TopicExchange
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
// 交换机
@Bean
public TopicExchange myTopicExchange() {
return new TopicExchange("myTopicExchange", true, false);
}
// ----- 队列 -----
@Bean
public Queue myTopicQueue_01() {
return new Queue("myTopicQueue_01", true);
}
@Bean Queue myTopicQueue_02() {
return new Queue("myTopicQueue_02", true);
}
@Bean
public Binding binding_01() {
return BindingBuilder.bind(myTopicQueue_01()).to(myTopicExchange()).with("topic.01");
}
@Bean
public Binding binding_02() {
return BindingBuilder.bind(myTopicQueue_02()).to(myTopicExchange()).with("topic.#");
}
}
//发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendByTopic")
public String sendByTopic() {
String msg = "hello topic";
rabbitTemplate.convertAndSend("myTopicExchange", "topic.01", msg + " topic.01");
rabbitTemplate.convertAndSend("myTopicExchange", "topic.xxx", msg + " topic.xxx");
return "success";
}
//处理消息
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectReceiver{
@RabbitHandler
@RabbitListener(queues = "myTopicQueue_01")
public void process_01(String msg) {
System.out.println("myTopicQueue_01 " + msg);
}
@RabbitHandler
@RabbitListener(queues = "myTopicQueue_02")
public void process_02(String msg) {
System.out.println("myTopicQueue_02 " + msg);
}
}
消费者方需要消息确认时:
//生产者配置回调函数:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 开启Mandatory, 才能触发回调函数,无论消息推送结果如何都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("/confirm/iCallback: "+"相关数据:" + correlationData);
System.out.println("/confirm/iCallback: "+"确认情况:" + ack);
System.out.println("/confirm/iCallback: "+"原因:" + cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback: "+"消息:" + message);
System.out.println("ReturnCallback: "+"回应码:" + replyCode);
System.out.println("ReturnCallback: "+"回应信息:" + replyText);
System.out.println("ReturnCallback: "+"交换机:" + exchange);
System.out.println("ReturnCallback: "+"路由键:" + routingKey);
}
});
return rabbitTemplate;
}
}
//消费者配置监听
import com.rabbitmq.demo.mq.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private MyAckReceiver myAckReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
// 监听队列名
container.setQueueNames("myDirectQueue");
// 当前消费者数量
container.setConcurrentConsumers(1);
// 最大消费者数量
container.setMaxConcurrentConsumers(1);
// 手动确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置监听器
container.setMessageListener(myAckReceiver);
return container;
}
}
//消费者创建监听器
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 消息的唯一性ID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String msg = message.toString();
System.out.println("消息: " + msg);
System.out.println("消息来自: "+message.getMessageProperties().getConsumerQueue());
// 手动确认
channel.basicAck(deliveryTag, true);
//消息不会被消费
//channel.basicNack(deliveryTag, true);
} catch (Exception e) {
// 拒绝策略
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
}
2.11 rabbitmq队列-高级篇
//问:队列应该在消费者还是生产者创建
//答:消费者创建可能会丢失消息;生产者创建不会丢失;两者都创建后者启动会覆盖前者;在可视化页面创建最为稳妥
//RabbitMQ设置过期时间TTL:配置类中多写参数给所有消息设置过期时间
@Configuration
public class TTLQueueRabbitConfig {
@Bean
public Queue TTLQueue() {
Map map = new HashMap<>();
map.put("x-message-ttl", 30000); // 队列中的所有消息未被消费则30秒后过期
map.put("x-max-length", 1000); // 设置队列的最大长度
return new Queue("TTL_QUEUE", true, false, false, map);
}
@Bean
public DirectExchange TTLExchange() {
return new DirectExchange("TTL_EXCHANGE", true, false);
}
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
}
}
//单独设置消息的过期时间
import cn.huawei.rabbitmqtest1.pojo.User;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendDirectMessage")
public void sendDirectMessage() {
//第一种写法:
MessageProperties messageProperties = new MessageProperties();
// 设置过期时间,单位:毫秒
messageProperties.setExpiration("30000");
//这个参数是用来做消息的唯一标识
//发布消息时使用,存储在消息的headers中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
User user = new User(i + "", "陈四 " + i);
Message message = new Message(JSON.toJSONString(user).getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", message, correlationData);
//第二种写法:
MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("30000");
return message;
}
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", message, messagePostProcessor);
}
}
}
//消息被拒绝、消息过期、队列达到最大长度 情况会进入死信队列
//下面使用了设计模式-门面? 和上面Map参数赋值是一样的
//注意,如果已经创建了用户队列,再去改参数添加死信队列或其他是不可以的,必须删除原队列
@Configuration
public class Declare {
@Bean
public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
return QueueBuilder
.durable(userQueueName)
//声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
.withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
//声明该队列死信消息在交换机的 路由键
.withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
.build();
}
@Bean
public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
return ExchangeBuilder
.topicExchange(userExchangeName)
.durable(true)
.build();
}
@Bean
public Binding userBinding(Queue userQueue, Exchange userExchange) {
return BindingBuilder
.bind(userQueue)
.to(userExchange)
.with("user.*")
.noargs();
}
@Bean
public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
return ExchangeBuilder
.topicExchange(commonDeadLetterExchange)
.durable(true)
.build();
}
@Bean
public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) {
return QueueBuilder
.durable(userDeadLetterQueue)
.build();
}
@Bean
public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) {
return BindingBuilder
.bind(userDeadLetterQueue)
.to(commonDeadLetterExchange)
.with("user-dead-letter-routing-key")
.noargs();
}
}
rabbitmq内存问题
#在服务器上也可以改变配置文件修改内存 # rabbitmq.conf 取其一 vm_memory_high_watermark.relative=0.4 vm_memory_high_watermark.absolute=1GB #也可以使用命令进行分配:取其一 #相对内存: 使用时可以把这个0.4替换成自己想要的百分比(百分比最好在0.4-0.6) rabbitmqctl set_vm_memory_hgih_waterwmark 0.4 #绝对内存 这里的2G根据自己内存的大小自己替换 rabbitmqctl set_vm_memory_hgih_waterwmark absolute 2G ------------------------------------------------------ #当rabbitmq可用磁盘低于可用空间限制也会挂起 默认磁盘50 MB 低于50MB将挂起报警 # rabbitmq.conf disk_free_limit.relative=1.5 #建议1.0~2.0之间 # disk_free_limit.absolute=50MB #内存换页 在RabbitMQ达到内存阈值并阻塞生产者之前,会尝试将内存中的消息换页到磁盘,以释放内存空间。内存换页由换页参数控制,默认为0.5,表示当内存使用量达到内存阈值的50%时会进行换页,也就是0.4*0.5=0.2 vm_memory_high_watermark_paging_ratio=0.52.12 集群
#rabbitmq有5种模式,但集群模式是4种。详细如下:
#单一模式:
即单机情况不做集群,就单独运行一个rabbitmq而已。
#普通模式(主备模式/Warren模式):rabbitmq默认的集群模式;并发和数据量不高时使用:主节点宕机则不可用
默认模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。
# 远程模式(Shovel插件,很少使用,很老了)
远程模式可以实现双活的一种模式,简称 shovel [ˈʃʌvəl] 模式,所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制
#多活模式(Federation)--异地数据复制的主流模式
federation 插件是一个不需要构建 cluster ,而在 brokers 之间传输消息的高性能插件,federation 插件可以在 brokers 或者 cluster 之间传输消息,连接的双方可以使用不同的 users 和 virtual hosts,双方也可以使用不同版本的 rabbitMQ 和 erlang。
federation 插件使用 AMQP 协议通信,可以接受不连续的传输。federation 不是建立在集群上的,而是建立在单个节点上的,如图上黄色的 rabbit node 3 可以与绿色的 node1、node2、node3 中的任意一个利用 federation 插件进行数据同步。
#镜像模式::
1.把需要的队列做成镜像队列,存在与多个节点属于RabbitMQ的HA(Highly Available (Mirrored) Queues)方案。该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。
2.非常经典的 mirror 镜像模式,保证 100% 数据不丢失。在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式。mirror 镜像队列,目的是为了保证 rabbitMQ 数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲是 2 - 3 个节点实现数据同步。对于 100% 数据可靠性解决方案,一般是采用 3 个节点。
用 KeepAlived 做了 HA-Proxy 的高可用,然后有 3 个节点的 MQ 服务,消息发送到主节点上,主节点通过 mirror队列把数据同步到其他的 MQ 节点,这样来实现其高可靠。(*******如下图所示********)
缺点:1.这个性能开销也太大,消息同步所有机器,导致网络带宽压力和消耗很重!
2.没有扩展性可言,如果某个queue负载很重,你加机器,新增的机器也包含了这个queue的所有数据,并没有办法线性扩展queue。那么怎么开启这个镜像集群模式呢?其实很简单rabbitmq有很好的管理控制台,可以在后台新增一个策略,这个策略是镜像集群模式的策略,可以指定要求数据同步到所有节点,也可以要求同步到指定数量的节点,然后再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
#Ps:不同机器集群配置时需要配置erlang.cookie
Rabbitmq的集群是依附于erlang的集群来工作的,所以必须先构建起erlang的集群。Erlang的集群中
各节点是经由各个cookie来实现的,这个cookie存放在/var/lib/rabbitmq/.erlang.cookie中,文件是400的权限。所以必须保证各节点cookie一致,不然节点之间就无法通信.
#如果执行# rabbitmqctl stop_app 这条命令报错:需要执行
#chmod 400 .erlang.cookie
#chown rabbitmq.rabbitmq .erlang.cookie
(官方在介绍集群的文档中提到过.erlang.cookie 一般会存在这两个地址:第一个是home/.erlang.cookie;第二个地方就是/var/lib/rabbitmq/.erlang.cookie。如果我们使用解压缩方式安装部署的rabbitmq,那么这个文件会在{home}目录下,也就是$home/.erlang.cookie。如果我们使用rpm等安装包方式进行安装的,那么这个文件会在/var/lib/rabbitmq目录下。)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JvZygWis-1646568038644)(picture/20200417183048206.png)]
2.13 分布式事务#分布式事务指事务的操作位于不同的节点上,需要保证事务的 AICD 特性。 #例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务。 #解决方案:在分布式系统中,要实现分布式事务,无外乎那几种解决方案 ##1.两阶段提交(2PC) 两阶段提交(Two-phase Commit,2PC),通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。 1.1 准备阶段:协调者询问参与者事务是否执行成功,参与者发回事务执行结果。 1.2 提交阶段如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。需要注意的是,在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。 1.3 存在的问题 1.3.1 同步阻塞 所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作。 1.3.2 单点问题 协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其它操作。 1.3.3 数据不一致 在阶段二,如果协调者只发送了部分 Commit 消息,此时网络发生异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。 1.3.4 太过保守 任意一个节点失败就会导致整个事务失败,没有完善的容错机制。 ##2.补偿事务(TCC) TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为 2.1 三个阶段: Try 阶段主要是对业务系统做检测及资源预留 Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 /confirm/i阶段时,默认 /confirm/i阶段是不会出错的。即:只要Try成功,/confirm/i一定成功。 Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。 举个例子,假入 Bob 要向 Smith 转账,思路大概是: 我们有一个本地方法,里面依次调用 首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。 在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。 如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。 2.2 优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些 2.3 缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。 #3.本地消息表(异步确保) 本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。 之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。 在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作。 优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。 缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。 4.MQ 事务消息Rabbitmq分布式事务
#rabbitMQ解决分布式事务原理: 采用最终一致性原理。 需要保证以下三要素 1、确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制) 2、MQ消费者消息能够正确消费消息,采用手动ACK模式(注意重试幂等性问题) 3、如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。 场景一: 如果消费者从消息队列中获取消息时,失败了,生产者是不需要回滚事务,那么如何解决呢? 答:消费者采用手动ACK,进行补偿机制(重试3次),注意幂等性问题(全局ID) 场景二: 如何确保生产者投递消息到MQ服务器一定是成功? 答:/confirm/i机制(确认应答机制) 如果失败了,如何解决 答:使用生产者重试机制进行发消息 3、如何保证第一个事务先执行,生产者投递消息到队列成功,消费者成功从队列中取出消息,但是订单事务出现错误,发生回滚,出现了数据不一致,即派单表里有数据,订单表里因为回滚没有数据。 答:订单服务产生消息时,通过路由键,分别给派单队列和补单队列发送ID,补单队列有ID,查询订单是否创建,如果没创建,就去创建Springboot具体实现
//原理图如下
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2OGDdTet-1646568038644)(picture/1105132-20190915214721459-1096313748.png)]
#生产者:配置;引用包省略
spring:
datasource:
druid:
url: jdbc:postgresql://127.0.0.1:5432/test01?characterEncoding=utf-8
username: admin
password: 123456
driver-class-name: org.postgresql.Driver
initial-size: 1
max-active: 20
max-wait: 6000
pool-prepared-statements: true
max-pool-prepared-statement-per-connection-size: 20
connection-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=2000
min-idle: 1
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
validation-query: select 1
test-while-idle: true
test-on-borrow: false
test-on-return: false
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
virtual-host: /
port: 5672
publisher-/confirm/is: true
server:
port: 8087
logging:
level:
org.springframework.jdbc.core.JdbcTemplate: DEBUG
package com.jlwj.mqtransaction.bean;
import lombok.Data;
import java.io.Serializable;
@Data
public class OrderBean implements Serializable {
private String orderNo;
private String orderInfo;
}
//发送消息类
package com.jlwj.mqtransaction.service;
import com.alibaba.fastjson.JSON;
import com.jlwj.mqtransaction.bean.OrderBean;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
@Slf4j
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private JdbcTemplate jdbcTemplate;
@PostConstruct
private void initRabbitTemplate(){
//设置消息发送确认回调,发送成功后更新消息表状态
rabbitTemplate.set/confirm/iCallback((CorrelationData correlationData, boolean ack, String cause) -> {
log.info(String.valueOf(ack));
if(ack){
String sql = "update t_confirm set send_status = ? where id = ?";
jdbcTemplate.update(sql,1,correlationData.getId());
}
});
}
public void sendMessage(OrderBean orderBean){
rabbitTemplate.convertAndSend("orderExchange","orderRoutingKey", JSON.toJSONString(orderBean),
message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;},
new CorrelationData(orderBean.getOrderNo()));
}
}
//创建订单
package com.jlwj.mqtransaction.service;
import com.alibaba.fastjson.JSON;
import com.jlwj.mqtransaction.bean.OrderBean;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
@Service
@Slf4j
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RabbitMQService rabbitMQService;
//注意加事务注解
@Transactional(propagation = Propagation.REQUIRED)
public void save(OrderBean orderBean){
String sql1 = "insert into t_order(order_no,order_info) values (?,?)";
String sql2 = "insert into t_confirm(id,message_info,send_status) values (?,?,?)";
jdbcTemplate.update(sql1,orderBean.getOrderNo(),orderBean.getOrderInfo());
jdbcTemplate.update(sql2,orderBean.getOrderNo(), JSON.toJSONString(orderBean),0);
rabbitMQService.sendMessage(orderBean);
}
}
//定时任务
package com.jlwj.mqtransaction.service;
import com.jlwj.mqtransaction.bean.OrderBean;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@Slf4j
public class OrderScheduleService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RabbitMQService rabbitMQService;
//定时扫描记录表,将发送状态为0的消息再次发送,甚至可以记录重发次数,必要时人工干预,生产环境中需要单独部署定时任务
@Scheduled(cron ="30/30 * * * * ?" )
public void scanOrder(){
log.info("定时扫面/confirm/i表");
String sql = "select o.* from t_order o join t_confirm c on o.order_no = c.id where c.send_status = 0";
List orderBeanList = jdbcTemplate.queryForList(sql, OrderBean.class);
for (OrderBean orderBean : orderBeanList) {
rabbitMQService.sendMessage(orderBean);
}
}
}
#消费者配置:
spring:
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
virtual-host: /
port: 5672
listener:
simple:
acknowledge-mode: manual
redis:
host: 127.0.0.1
port: 6379
timeout: 5000
jedis:
pool:
max-idle: 8
min-idle: 0
max-active: 8
max-wait: 1
//配置消息队列
package com.jlwj.messageservice.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class RabbitConfig {
@Bean
public Queue dlQueue(){
return QueueBuilder.durable("dlQueue")
.build();
}
@Bean
public DirectExchange dlExchange(){
return (DirectExchange) ExchangeBuilder.directExchange("dlExchange").build();
}
@Bean
public Binding dlMessageBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dlRoutingKey");
}
@Bean
public DirectExchange messageDirectExchange() {
return (DirectExchange) ExchangeBuilder.directExchange("orderExchange")
.durable(true)
.build();
}
@Bean
public Queue messageQueue() {
return QueueBuilder.durable("orderQueue")
//配置死信
.withArgument("x-dead-letter-exchange","dlExchange")
.withArgument("x-dead-letter-routing-key","dlRoutingKey")
.build();
}
@Bean
public Binding messageBinding() {
return BindingBuilder.bind(messageQueue())
.to(messageDirectExchange())
.with("orderRoutingKey");
}
}
//监听订单
package com.jlwj.messageservice.listener;
import com.alibaba.fastjson.JSON;
import com.jlwj.messageservice.bean.OrderBean;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class OrderListener {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RabbitListener(queues = "orderQueue")
public void HandlerMessage(Channel channel, @Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
@Header(AmqpHeaders.REDELIVERED) boolean reDelivered ) throws IOException {
log.info(message);
OrderBean orderBean = JSON.parseObject(message,OrderBean.class);
try {
log.info("收到的消息为{}",JSON.toJSONString(orderBean));
//保证幂等性
if(stringRedisTemplate.opsForValue().get(orderBean.getOrderNo())==null){
sendMessage(orderBean);
stringRedisTemplate.opsForValue().set(orderBean.getOrderNo(),"1");
}
channel.basicAck(tag,false);
} catch (Exception e) {
if(reDelivered){
log.info("消息已重复处理失败:{}",message);
channel.basicReject(tag,false);
}else{
log.error("消息处理失败",e);
//重新入队一次
channel.basicNack(tag,false,true);
}
}
}
private void sendMessage(OrderBean orderBean)throws Exception{
if(orderBean.getOrderNo().equals("0007")){
int a =3/0;
}
log.info("模拟发送短信");
}
}
//监听死信队列订单:一直失败订单;也可以将订单存入mysql等数据库进行人工干预
package com.jlwj.messageservice.listener;
import com.alibaba.fastjson.JSON;
import com.jlwj.messageservice.bean.OrderBean;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class DlListener {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RabbitListener(queues = "dlQueue")
public void HandlerMessage(Channel channel, Message message, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
log.info(new String(message.getBody()));
//人工处理死信队列中的消息
handlerDl(new String(message.getBody()));
channel.basicAck(tag,false);
}
private void handlerDl(String message){
log.info("发送邮件,请求人工干预:{}",message);
}
}



