栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

golang之消息队列rabbitmq

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

golang之消息队列rabbitmq

之前我写过一篇关于rocketmq的文章,因为rabbitmq的使用更广泛,所以写一篇关于rabbitmq,及go如何操作rabbitmq的笔记。

消息队列的作用:
  1. 异步,将同步的消息变为异步,例如我们可以使用rpc调用另一个服务,但是我们必须等待返回(同步),用mq可以变异步
  2. 解耦,将单体服务拆分多个微服务,实现了分布式部署,单个服务的修改、增加或删除,不影响其他服务,不需要全部服务关闭重启
  3. 抗压,由于是异步,解耦的,高并发请求到来时,我们不直接发送给服务,而是发给MQ,让服务决定什么时候接收消息,提供服务,这样就缓解了服务的压力
    图示:
    用户注册后发邮件和虚拟币:
    异步解耦图:

    抗压图:
docker安装

拉取image:

docker pull rabbitmq:3.8-management-alpine

启动容器:
5672进行通信,15672 ,web管理工具

docker run -d --name rmq 
-e RABBITMQ_DEFAULT_USER=用户名 
-e RABBITMQ_DEFAULT_PASS=密码 
-p 15672:15672 
-p 5672:5672 
rabbitmq:3.8-management-alpine
消息收发模式

明确连个概念,exchange(路由) queue(队列)
工作模式:

以下用p 代指生产者,用 c 代指消费者,用 x 代指 exchange

1.p发给队列,单个c消费

2.p发给队列,多个c消费,q消息只能被被c消费,不可能q的一个消息被消费两次

3.fandout模式:p将消息发给x,x将同一个消息发给所有q,c 按 1,2方式消费q的消息

4.direct(路由)模式:p 按照路由Routing Key将消息发给q,(一个消息可能发给多个q),c 按 1,2方式消费q的消息 5.topic模式:p 按照路由Routing Key将消息发给q,(一个消息可能发给多个q),c 按 1,2方式消费q的消息,与4的区别是topic可以有通配符匹配 用go操作rabbitmq
go get github.com/rabbitmq/amqp091-go
收发模式2示例:
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"time"
)

func main() {
	conn, err := amqp.Dial("amqp://用户名:密码@IP:端口/")
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	//durable 服务器重启还有queue  autoDelete 自动删除 exclusive 独占连接,这个q别人连不上 noWait 是否等待返回的一些状态结果
	//关于queue的一些设置
	q, err := ch.QueueDeclare("go_q1", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}

	// 开启消费者
	go consume("c1",conn, q.Name)
	go consume("c2",conn, q.Name)

	i := 0
	for {
		i++
		err := ch.Publish("", q.Name, false, false, amqp.Publishing{
			Body: []byte(fmt.Sprintf("message %d", i)),
		})
		if err != nil {
			panic(err)
		}
		time.Sleep(200 * time.Millisecond)
	}
}

func consume(name string,conn *amqp.Connection, q string)  {
	ch, err :=  conn.Channel()
	if err != nil {
		panic(err)
	}
	msgs, err := ch.Consume(q,name,true, false,false,false,nil)
	if err != nil {
		panic(err)
	}

	for msg := range msgs {
		fmt.Printf("%s:%sn",name,msg.Body)
	}
}
fanout模式示例:
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"time"
)

func main() {
	conn, err := amqp.Dial("amqp://用户名:密码@IP:端口/")
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}

	err = ch.ExchangeDeclare("ex","fanout",true,false,false,false,nil)
	if err != nil {
		panic(err)
	}

	go subscribe(conn,"ex")
	go subscribe(conn,"ex")

	i := 0
	for {
		i++
		err := ch.Publish("ex", "", false, false, amqp.Publishing{
			Body: []byte(fmt.Sprintf("message %d", i)),
		})
		if err != nil {
			panic(err)
		}
		time.Sleep(200 * time.Millisecond)
	}
}

func subscribe(conn *amqp.Connection, ex string) {
	ch, err :=  conn.Channel()
	if err != nil {
		panic(err)
	}
	defer ch.Close()

	q, err := ch.QueueDeclare("", false, true, false, false, nil)
	if err != nil {
		panic(err)
	}
	defer ch.QueueDelete(q.Name, false,false,false)
	err = ch.QueueBind(q.Name,"",ex,false,nil)
	if err != nil {
		panic(err)
	}
	consume("c3",ch,q.Name)

}

func consume(name string,ch *amqp.Channel, q string)  {
	msgs, err := ch.Consume(q,name,true, false,false,false,nil)
	if err != nil {
		panic(err)
	}

	for msg := range msgs {
		fmt.Printf("%s:%sn",name,msg.Body)
	}
}

写代码的时候注意,收发消息,一定要在不同的channel进行,大家可以把channel认为是一个tcp连接的分割。
可以看到有一个exchange,对应2个queue。对应一条tcp连接(分成3个channel,1个向exchange发,2个从queue收)


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336578.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号