下载地址
Erlang https://www.erlang-solutions.com/resources/download.html
rabbitMQ http://www.rabbitmq.com/download.html
其实坑很多的,要找到对应的版本号
配置方法:
• 安装完事儿后要记得配置一下系统的环境变量。
此电脑–>鼠标右键“属性”–>高级系统设置–>环境变量–>“新建”系统环境变量
变量名:ERLANG_HOME
变量值就是刚才erlang的安装地址,点击确定。
然后双击系统变量path
点击“新建”,将%ERLANG_HOME%bin加入到path中。
• 最后windows键+R键,输入cmd,再输入erl,看到版本号就说明erlang安装成功了。
• 双击下载后的.exe文件,安装过程与erlang的安装过程相同。
• RabbitMQ安装好后接下来安装RabbitMQ-Plugins。打开命令行cd,输入RabbitMQ的sbin目录。
我的目录是:D:Program FilesRabbitMQ Serverrabbitmq_server-3.7.3sbin
然后在后面输入rabbitmq-plugins enable rabbitmq_management命令进行安装
等几秒钟看到这个界面后,访问http://localhost:15672
默认用户名和密码都是guest
生产:
ConnectionFactory connection = new ConnectionFactory();
connection.HostName = "localhost";
connection.UserName = "guest";
connection.Password = "guest";
using (var models= connection.CreateConnection())
{
//创建一个信道
using (IModel channel = models.CreateModel())
{
//删除一下
channel.ExchangeDelete("kso");
channel.QueueDelete("queuekso1");
channel.QueueDelete("queuekso2");
// 创建交换机
channel.ExchangeDeclare("kso",ExchangeType.Direct);
//创建队列
channel.QueueDeclare("queuekso1",true,false,false);
channel.QueueDeclare("queuekso2",true,false,false);
//绑定
channel.QueueBind("queuekso1","kso","advancd");
channel.QueueBind("queuekso2","kso","advancd");
Console.BackgroundColor = ConsoleColor.Blue;
for (int i = 0; i < 1000; i++)
{
IBasicProperties basicProperties = channel.CreateBasicProperties();
basicProperties.Persistent = true;
channel.BasicPublish("kso",
"advancd", basicProperties,
Encoding.UTF8.GetBytes("这是一个生产这生产的消息"+i)
) ;
Console.WriteLine("这是一个生产这生产的消息" + i);
}
while (true)
{
Console.WriteLine("输入消息!");
string name = Console.ReadLine();
IBasicProperties basicProperties = channel.CreateBasicProperties();
basicProperties.Persistent = true;
channel.BasicPublish("kso","advancd",basicProperties,Encoding.UTF8.GetBytes("这是自己输入的消息"+name));
Console.WriteLine("这是自己输入的消息"+name);
}
消费者代码
public void Customtion()
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";//RabbitMQ服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
using (var connet = factory.CreateConnection())
{
using (var channel = connet.CreateModel())
{
//创建交换机
channel.ExchangeDeclare("kso", type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null);
channel.QueueDeclare("queuekso1", true, false, false, null);
channel.QueueDeclare("queuekso2", true, false, false, null);
channel.QueueBind("queuekso1", "kso", "advancd", null);
channel.QueueBind("queuekso2", "kso", "advancd", null);
RabbitMQ.Client.Events.EventingBasicConsumer eventingBasicConsumer = new RabbitMQ.Client.Events.EventingBasicConsumer(channel);
eventingBasicConsumer.Received += (sender, e) =>
{
var boy = e.Body;
Console.WriteLine(Encoding.UTF8.GetString(boy.ToArray()));
};
channel.BasicConsume(queue: "queuekso1", autoAck: true, consumer: eventingBasicConsumer);
channel.BasicConsume(queue: "queuekso2", autoAck: true, consumer: eventingBasicConsumer);
Console.WriteLine(" 结束了");
Console.ReadKey();
}
}
}
RabbitMQ事务支持
生产者的简单事务代码
channel./confirm/iSelect();
channel.BasicPublish("kso", item.LogType, properties, item.Msg);
Console.WriteLine(Encoding.UTF8.GetString(item.Msg));
if (channel.WaitFor/confirm/is()) //如果一条消息或多消息都确认发送,
{
Console.WriteLine($"【{item}】发送到Broke成功!");
}
else
{
//可以记录个日志,重试一下;
}
channel.WaitFor/confirm/isOrDie(); 这个代表着发送消息如果失败则直接关闭信道不能再次尝试
消费者事务支持
//如果在这里处理消息的手,异常了呢?
//Console.WriteLine($"接收到消息:{message}"); ;
if (i < 50)
{
//手动确认 消息正常消费 告诉Broker:你可以把当前这条消息删除掉了
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(message);
}
else
{
//否定:告诉Broker,这个消息我没有正常消费; requeue: true:重新写入到队列里去; false:你还是删除掉;
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
}



