栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbmitmq连接池[已过生产]

rabbmitmq连接池[已过生产]


源码地址https://gitee.com/tym_hmm/rabbitmq-pool-go

rabbitmq 连接池channel复用

开发语言 golang 依赖库

go get -u gitee.com/tym_hmm/rabbitmq-pool-go

go get -u gitee.com/tym_hmm/rabbitmq-pool-go

已在线上生产环镜运行, 5200W请求 qbs 3000 时, 连接池显示无压力
rabbitmq部署为线上集群

功能说明
    自定义连接池大小及最大处理channel数消费者底层断线自动重连底层使用轮循方式复用tcp生产者每个tcp对应一个channel,防止channel写入阻塞造成内存使用过量支持rabbitmq exchangeType默认值
名称说明
tcp最大连接数5
生产者消费发送失败最大重试次数5
消费者最大channel信道数(每个连接自动平分)100(每个tcp10个)
使用
    初始化
var oncePool sync.once
var instanceRPool *kelleyRabbimqPool.RabbitPool
func initrabbitmq() *kelleyRabbimqPool.RabbitPool {
  oncePool.Do(func() {
  //初始化生产者
  instanceRPool = kelleyRabbimqPool.NewProductPool()
  //初始化消费者
  instanceConsumePool = kelleyRabbimqPool.NewConsumePool()
    err := instanceRPool.Connect("192.168.1.202", 5672, "guest", "guest")
    if err != nil {
      fmt.Println(err)
    }
  })
  return instanceRPool
}
    生产者
var wg sync.WaitGroup
for i:=0;i<100000; i++ {
  wg.Add(1)
  go func(num int) {
    defer wg.Done()
    data:=kelleyRabbimqPool.GetRabbitMqDataFormat(
      "testChange5", 
      kelleyRabbimqPool.EXCHANGE_TYPE_TOPIC, 
      "textQueue5", 
      "/", 
      fmt.Sprintf("这里是数据%d", num)
    )
    _=instanceRPool.Push(data)
  }(i)
}
wg.Wait()
    消费者

可定义多个消息者事件, 不通交换机, 队列, 路由

每个事件独立

nomrl := &rabbitmq.ConsumeReceive{
#定义消费者事件
	ExchangeName: "testChange31",//队列名称
        ExchangeType: kelleyRabbimqPool.EXCHANGE_TYPE_DIRECT,
        Route:        "",
        QueueName:    "testQueue31",
        IsTry:true,//是否重试
        MaxReTry: 5,//最大重试次数
        EventFail: func(code int, e error, data []byte) {
        	fmt.Printf("error:%s", e)
        },
        
        EventSuccess: func(data []byte, header map[string]interface{},retryClient kelleyRabbimqPool.RetryClientInterface)bool {//如果返回true 则无需重试
        	fmt.Printf("data:%sn", string(data))
        	return true
        },
}
instanceConsumePool.RegisterConsumeReceive(nomrl)

err := instanceConsumePool.RunConsume()
if err != nil {
  fmt.Println(err)
}
    错误码说明

错误码为

    生产者push时返回的 *RabbitMqError消费者事件监听回返的 code
错误码说明
501生产者发送超过最大重试次数
502获取信道失败, 一般为认道队列数用尽
503交换机/队列/绑定失败
504连接失败
506信道创建失败
507超过最大重试次数
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/707126.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号