之前我写过一篇关于rocketmq的文章,因为rabbitmq的使用更广泛,所以写一篇关于rabbitmq,及go如何操作rabbitmq的笔记。
消息队列的作用:- 异步,将同步的消息变为异步,例如我们可以使用rpc调用另一个服务,但是我们必须等待返回(同步),用mq可以变异步
- 解耦,将单体服务拆分多个微服务,实现了分布式部署,单个服务的修改、增加或删除,不影响其他服务,不需要全部服务关闭重启
- 抗压,由于是异步,解耦的,高并发请求到来时,我们不直接发送给服务,而是发给MQ,让服务决定什么时候接收消息,提供服务,这样就缓解了服务的压力
图示:
用户注册后发邮件和虚拟币:
异步解耦图:
抗压图:
拉取image:
docker pull rabbitmq:3.8-management-alpine
启动容器:
5672进行通信,15672 ,web管理工具
docker run -d --name rmq -e RABBITMQ_DEFAULT_USER=用户名 -e RABBITMQ_DEFAULT_PASS=密码 -p 15672:15672 -p 5672:5672 rabbitmq:3.8-management-alpine消息收发模式
明确连个概念,exchange(路由) queue(队列)
工作模式:
以下用p 代指生产者,用 c 代指消费者,用 x 代指 exchange
go get github.com/rabbitmq/amqp091-go收发模式2示例:
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
func main() {
conn, err := amqp.Dial("amqp://用户名:密码@IP:端口/")
if err != nil {
panic(err)
}
ch, err := conn.Channel()
if err != nil {
panic(err)
}
//durable 服务器重启还有queue autoDelete 自动删除 exclusive 独占连接,这个q别人连不上 noWait 是否等待返回的一些状态结果
//关于queue的一些设置
q, err := ch.QueueDeclare("go_q1", true, false, false, false, nil)
if err != nil {
panic(err)
}
// 开启消费者
go consume("c1",conn, q.Name)
go consume("c2",conn, q.Name)
i := 0
for {
i++
err := ch.Publish("", q.Name, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("message %d", i)),
})
if err != nil {
panic(err)
}
time.Sleep(200 * time.Millisecond)
}
}
func consume(name string,conn *amqp.Connection, q string) {
ch, err := conn.Channel()
if err != nil {
panic(err)
}
msgs, err := ch.Consume(q,name,true, false,false,false,nil)
if err != nil {
panic(err)
}
for msg := range msgs {
fmt.Printf("%s:%sn",name,msg.Body)
}
}
fanout模式示例:
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
func main() {
conn, err := amqp.Dial("amqp://用户名:密码@IP:端口/")
if err != nil {
panic(err)
}
ch, err := conn.Channel()
if err != nil {
panic(err)
}
err = ch.ExchangeDeclare("ex","fanout",true,false,false,false,nil)
if err != nil {
panic(err)
}
go subscribe(conn,"ex")
go subscribe(conn,"ex")
i := 0
for {
i++
err := ch.Publish("ex", "", false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("message %d", i)),
})
if err != nil {
panic(err)
}
time.Sleep(200 * time.Millisecond)
}
}
func subscribe(conn *amqp.Connection, ex string) {
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
q, err := ch.QueueDeclare("", false, true, false, false, nil)
if err != nil {
panic(err)
}
defer ch.QueueDelete(q.Name, false,false,false)
err = ch.QueueBind(q.Name,"",ex,false,nil)
if err != nil {
panic(err)
}
consume("c3",ch,q.Name)
}
func consume(name string,ch *amqp.Channel, q string) {
msgs, err := ch.Consume(q,name,true, false,false,false,nil)
if err != nil {
panic(err)
}
for msg := range msgs {
fmt.Printf("%s:%sn",name,msg.Body)
}
}
写代码的时候注意,收发消息,一定要在不同的channel进行,大家可以把channel认为是一个tcp连接的分割。
可以看到有一个exchange,对应2个queue。对应一条tcp连接(分成3个channel,1个向exchange发,2个从queue收)



