启动RabbitMQ
点击你刚刚创建的amd用户
点了之后会出现下面这张图片的内容
把下面的括号都选上 然后点击close
注意生产者与消费者都要在同一网络上
生产者 this.hosts=['amqp://192.168.199.130'];//生产者的ip地址
this.index=0;
this.open = amqp.connect({
hostname:"192.168.199.130",//IP地址
protocol:"amqp",
port:5672,
username:"amd",// 你刚刚创建的账号
password:"amd"//与密码
});
let amqp = require('amqplib');
class RabbitMQ {
constructor() {
this.hosts=['amqp://192.168.199.130'];
this.index=0;
this.open = amqp.connect({
hostname:"192.168.199.130",
protocol:"amqp",
port:5672,
username:"amd",
password:"amd"
});
}
sendQueueMsg(queueName, msg, errCallBack) {
let self = this;
self.open
.then(function (conn) {
return conn.createChannel();
})
.then(function (channel) {
return channel.assertQueue(queueName).then(function (ok) {
return channel.sendToQueue(queueName, Buffer.from(msg), {
persistent: true
});
})
.then(function (data) {
if (data) {
errCallBack && errCallBack("success");
channel.close();
}
})
.catch(function () {
setTimeout(() => {
if (channel) {
channel.close();
}
}, 500)
});
})
.catch(function () {
let num = self.index++;
if (num <= self.length - 1) {
self.open = amqp.connect(self.hosts[num]);
} else {
self.index == 0;
}
});
}
}
module.exports=RabbitMQ
if循环生成消息
//ptest.js
var RabbitMQ=require("./prd.js")
let mq = new RabbitMQ();
var count=0;
setInterval(()=>{
count++;
mq.sendQueueMsg('testQueue', JSON.stringify({kind:"obj",count:count}), (error) => {
console.log(error)
})
},1000);
消费者
cus.js
constructor() {
this.hosts=['amqp://192.168.199.249']; //生产者的ip地址
this.index=0;
this.open = amqp.connect({
hostname:"192.168.199.249",//生产者的ip地址
protocol:"amqp",
port:5672,
username:"amd",
password:"amd"//与生产者的RabbitMQ的账号密码
});
}
let amqp = require('amqplib');
class RabbitMQ{
constructor() {
this.hosts=['amqp://192.168.199.249'];
this.index=0;
this.open = amqp.connect({
hostname:"192.168.199.249",
protocol:"amqp",
port:5672,
username:"admin",
password:"admin"
});
}
receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
let self = this;
self.open
.then(function (conn) {
return conn.createChannel();
})
.then(function (channel) {
return channel.assertQueue(queueName)
.then(function (ok) {
return channel.consume(queueName, function (msg) {
if (msg !== null) {
let data = msg.content.toString();
channel.ack(msg);
receiveCallBack && receiveCallBack(data);
}
})
.finally(function () {
setTimeout(() => {
if (channel) {
// channel.close();
}
}, 500)
});
})
})
.catch(function () {
let num = self.index++;
if (num <= self.length - 1) {
self.open = amqp.connect(self.hosts[num]);
} else {
self.index = 0;
self.open = amqp.connect(self.hosts[0]);
}
});
}
}
module.exports=RabbitMQ
//ctest.js
var RabbitMQ=require("./cus.js")
let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) =>
{
console.log(msg);
})
先运行生产者
在运行消费者



