栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【C#】RebbitMQ实例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【C#】RebbitMQ实例

RebbitMQ基本概念
1、信道(Channel):信道是消息的生产者、消费者和服务器进行通信的虚拟连接。TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP连接的基础上构建了虚拟的信道。我们尽量重复使用TCP连接,而信道则是可以用完了就关闭。
2、队列(Queue):用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取数据。
3、交换机(Exchange):把消息路由到一个或者多个队列中。

routing模式介绍
生产者把消息发布到交换机中,消息携带一个routingKey属性,交换机会根据routingKey的值把消息发送到一个或者多个队列,消费者会从队列中获取消息,交换机和队列都位于RabbitMQ服务器内部。
优点:即使消费者不在线,消费者相关的消息也会被保存在队列中,当消费者上线之后,消费者就可以获取到离线期间错过的消息。

1、安装ErlangOTP,下载地址:http://www.erlang.org/downloads

2、安装RabbitMQ,下载地址:http://www.rabbitmq.com/

3、创建两个控制台应用程序,SenderTest和ConsumerTest

4、打开程序包管理器控制台,执行Install-Package RabbitMQ.Client命令,引入RabbitMQ的开源类库

生产者端步骤:

    (1):创建ConnectionFactory,并且设置一些参数,比如hostname,portNumber等等

    (2):利用ConnectionFactory创建一个Connection连接

    (3):利用Connection创建一个Channel通道

    (4):创建消息,并且发送到交换机中

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;

namespace SenderTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            //RabbitMQ服务器地址
            factory.HostName = "127.0.0.1";
            //消息异步转发
            factory.DispatchConsumersAsync = true;
            //交换机名称
            string exchangeName = "exchange1";
            //routingKey的值
            string eventName = "key1";
            //连接服务器
            using var conn = factory.CreateConnection();
            while (true)
            {
                //待发送的消息
                string msg = DateTime.Now.ToLongTimeString();
                //创建信道
                using (var channel = conn.CreateModel())
                {
                    //消息属性
                    var properties = channel.CreateBasicProperties();
                    //传输模式,1:非持久化,2:持久化
                    properties.DeliveryMode = 2;
                    //声明交换机
                    channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
                    byte[] body = Encoding.UTF8.GetBytes(msg);
                    //生产消息
                    channel.BasicPublish(exchange: exchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body);
                }
                Console.WriteLine("发送消息:" + msg);
                Thread.Sleep(1000);
            }
        }
    }
}

消费者端步骤:

     (1):创建ConnectionFactory,并且设置一些参数,比如hostname,portNumber等等

     (2):利用ConnectionFactory创建一个Connection连接

     (3):利用Connection创建一个Channel通道

     (4):将queue和Channel进行绑定,注意这里的queue名字要和前面生产者端创建的queue一致

     (5):创建消费者Consumer来接收消息,同时将消费者和queue进行绑定

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading.Tasks;

namespace ConsumerTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            //RabbitMQ服务器地址
            factory.HostName = "127.0.0.1";
            //消息异步转发
            factory.DispatchConsumersAsync = true;
            //交换机名称
            string exchangeName = "exchange1";
            //routingKey的值
            string eventName = "key1";
            using var conn = factory.CreateConnection();
            //创建信道
            using var channel = conn.CreateModel();
            //声明交换机
            channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
            //声明队列
            string queueName = "queue1";
            channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
            //将队列绑定到交换机中
            channel.QueueBind(queueName, exchangeName, eventName);
            //消费者拉取消息
            AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(channel);
            consumer.Received += Consumer_Received;
            //消费消息
            channel.BasicConsume(queueName, autoAck: false, consumer);
            Console.WriteLine("按回车退出");
            Console.ReadLine();
            async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
            {
                try
                {
                    byte[] bytes = args.Body.ToArray();
                    string msg = Encoding.UTF8.GetString(bytes);
                    Console.WriteLine(DateTime.Now.ToLongTimeString() + "收到了信息" + msg);
                    //消息确认处理,DeliveryTag:消息的编号
                    channel.BasicAck(args.DeliveryTag, multiple: false);
                    await Task.Delay(800);
                }
                catch (Exception ex)
                {
                    //对没有确认处理的消息进行消息重发
                    channel.BasicReject(args.DeliveryTag, true);
                    Console.WriteLine(DateTime.Now.ToLongTimeString() + "处理收到的消息出错:" + ex);
                }
            }
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/769591.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号