栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ的添加用户与权限 生产者与消费者

RabbitMQ的添加用户与权限 生产者与消费者

RabbitMq 添加用户

启动RabbitMQ

点击你刚刚创建的amd用户

点了之后会出现下面这张图片的内容


把下面的括号都选上 然后点击close

生产者与消费者

注意生产者与消费者都要在同一网络上

生产者
  this.hosts=['amqp://192.168.199.130'];//生产者的ip地址
        this.index=0;
        this.open = amqp.connect({
            hostname:"192.168.199.130",//IP地址
            protocol:"amqp",
            port:5672,
            username:"amd",// 你刚刚创建的账号
            password:"amd"//与密码
        });
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts=['amqp://192.168.199.130'];
        this.index=0;
        this.open = amqp.connect({
            hostname:"192.168.199.130",
            protocol:"amqp",
            port:5672,
            username:"amd",
            password:"amd"
        });
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, Buffer.from(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

module.exports=RabbitMQ 

if循环生成消息

//ptest.js
var RabbitMQ=require("./prd.js")
let mq = new RabbitMQ();
var count=0;
setInterval(()=>{
    count++;
    mq.sendQueueMsg('testQueue', JSON.stringify({kind:"obj",count:count}), (error) => {
        console.log(error)
    })

},1000);
消费者

cus.js

  constructor() {
      this.hosts=['amqp://192.168.199.249']; //生产者的ip地址
      this.index=0;
        this.open = amqp.connect({
         hostname:"192.168.199.249",//生产者的ip地址
         protocol:"amqp",
         port:5672,
         username:"amd",
         password:"amd"//与生产者的RabbitMQ的账号密码
         });
    }
let amqp = require('amqplib');

class RabbitMQ{
    constructor() {
      this.hosts=['amqp://192.168.199.249'];
      this.index=0;
        this.open = amqp.connect({
         hostname:"192.168.199.249",
         protocol:"amqp",
         port:5672,
         username:"admin",
         password:"admin"
         });
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;
        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                      //  channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }
}

module.exports=RabbitMQ 
//ctest.js
var RabbitMQ=require("./cus.js")
let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) => 
{    
   console.log(msg);
})

先运行生产者

在运行消费者

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/629247.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号