栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

NodeJS之消息队列RabbitMQ

NodeJS之消息队列RabbitMQ

介绍
消息队列是一个消息代理,负责接受和转发消息,它的消息发布者和使用者不需要知道对方的存在。消息发布者(生产者)只管把消息发布到队列中而不用管是谁来取,消息使用者(消费者)只管从队列中取消息而不需要管是谁发布的。
生产者、消费者、消息队列不必在同一个主机上
安装
需要先安装RabbitMQ的依赖项Erlang,Erlang 必须使用管理帐户安装,否则 RabbitMQ将无法发现它。
Erlang下载路径:
https://www.erlang.org/downloads
RabbitMQ下载路径:
https://www.rabbitmq.com/install-windows.html#installer
注:环境变量Path要加安装Erlang下bin的路径,如E:MQerl-24.2.1bin
启动本地消息队列:
双击rabbitMQ—>sbin---->rabbitmq-server.bat,默认用户名密码guest/guest


使用amqplib操作RabbitMQ

npm install amqplib --save
const amqp = require('amqplib/callback_api');
const amqpurl = {
    protocol: 'amqp',
    hostname: 'localhost',
    port: 5672,
    username: 'guest',
    password: 'guest',
    vhost: '/'
}
const rabbitmqService = {};
rabbitmqService.findFromQueue = (QUEUE, receiveCallBack) => {
    amqp.connect(amqpurl, (connError, connection) => {
        console.log('正在监听队列...')
        if (connError) {
            throw connError;
        }
        connection.createChannel((channelError, channel) => {
            if (channelError) {
                throw channelError;
            }
            channel.assertQueue(QUEUE, { durable: true });
            // 消费者
            channel.consume(QUEUE, function (msg) {
                try {
                    // 是否确认延迟到操作代码里面,避免消息丢失
                    receiveCallBack && receiveCallBack(JSON.parse(msg.content), channel, msg);
                } catch (error) {
                    channel.nack(msg, false, true);
                }
            })
        })
    });
}
rabbitmqService.sendToQueue = (QUEUE, content) => {
    amqp.connect(amqpurl, (connError, connection) => {
        if (connError) {
            throw connError;
        }
        connection.createChannel((channelError, channel) => {
            if (channelError) {
                throw channelError;
            }
            channel.assertQueue(QUEUE, { durable: true });
            channel.sendToQueue(QUEUE, Buffer.from(JSON.stringify(content)));
            channel.close(function () {
                connection.close();
            })
        });
    });
}
module.exports = rabbitmqService;

消费者

rabbitmqService.findFromQueue(QUEUENAME, async (content, channel, msg) => {
	try {
		console.log(content);
		channel.ack(msg, false);
	} catch (error) {
		channel.nack(msg, false, true);
	}
})
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758185.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号