RebbitMQ基本概念
1、信道(Channel):信道是消息的生产者、消费者和服务器进行通信的虚拟连接。TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP连接的基础上构建了虚拟的信道。我们尽量重复使用TCP连接,而信道则是可以用完了就关闭。
2、队列(Queue):用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取数据。
3、交换机(Exchange):把消息路由到一个或者多个队列中。
routing模式介绍
生产者把消息发布到交换机中,消息携带一个routingKey属性,交换机会根据routingKey的值把消息发送到一个或者多个队列,消费者会从队列中获取消息,交换机和队列都位于RabbitMQ服务器内部。
优点:即使消费者不在线,消费者相关的消息也会被保存在队列中,当消费者上线之后,消费者就可以获取到离线期间错过的消息。
1、安装ErlangOTP,下载地址:http://www.erlang.org/downloads
2、安装RabbitMQ,下载地址:http://www.rabbitmq.com/
3、创建两个控制台应用程序,SenderTest和ConsumerTest
4、打开程序包管理器控制台,执行Install-Package RabbitMQ.Client命令,引入RabbitMQ的开源类库
生产者端步骤:
(1):创建ConnectionFactory,并且设置一些参数,比如hostname,portNumber等等
(2):利用ConnectionFactory创建一个Connection连接
(3):利用Connection创建一个Channel通道
(4):创建消息,并且发送到交换机中
using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
namespace SenderTest
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory();
//RabbitMQ服务器地址
factory.HostName = "127.0.0.1";
//消息异步转发
factory.DispatchConsumersAsync = true;
//交换机名称
string exchangeName = "exchange1";
//routingKey的值
string eventName = "key1";
//连接服务器
using var conn = factory.CreateConnection();
while (true)
{
//待发送的消息
string msg = DateTime.Now.ToLongTimeString();
//创建信道
using (var channel = conn.CreateModel())
{
//消息属性
var properties = channel.CreateBasicProperties();
//传输模式,1:非持久化,2:持久化
properties.DeliveryMode = 2;
//声明交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
byte[] body = Encoding.UTF8.GetBytes(msg);
//生产消息
channel.BasicPublish(exchange: exchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body);
}
Console.WriteLine("发送消息:" + msg);
Thread.Sleep(1000);
}
}
}
}
消费者端步骤:
(1):创建ConnectionFactory,并且设置一些参数,比如hostname,portNumber等等
(2):利用ConnectionFactory创建一个Connection连接
(3):利用Connection创建一个Channel通道
(4):将queue和Channel进行绑定,注意这里的queue名字要和前面生产者端创建的queue一致
(5):创建消费者Consumer来接收消息,同时将消费者和queue进行绑定
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading.Tasks;
namespace ConsumerTest
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory();
//RabbitMQ服务器地址
factory.HostName = "127.0.0.1";
//消息异步转发
factory.DispatchConsumersAsync = true;
//交换机名称
string exchangeName = "exchange1";
//routingKey的值
string eventName = "key1";
using var conn = factory.CreateConnection();
//创建信道
using var channel = conn.CreateModel();
//声明交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
//声明队列
string queueName = "queue1";
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
//将队列绑定到交换机中
channel.QueueBind(queueName, exchangeName, eventName);
//消费者拉取消息
AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
//消费消息
channel.BasicConsume(queueName, autoAck: false, consumer);
Console.WriteLine("按回车退出");
Console.ReadLine();
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
try
{
byte[] bytes = args.Body.ToArray();
string msg = Encoding.UTF8.GetString(bytes);
Console.WriteLine(DateTime.Now.ToLongTimeString() + "收到了信息" + msg);
//消息确认处理,DeliveryTag:消息的编号
channel.BasicAck(args.DeliveryTag, multiple: false);
await Task.Delay(800);
}
catch (Exception ex)
{
//对没有确认处理的消息进行消息重发
channel.BasicReject(args.DeliveryTag, true);
Console.WriteLine(DateTime.Now.ToLongTimeString() + "处理收到的消息出错:" + ex);
}
}
}
}
}



