栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C# > C#教程

C#实现同Active MQ通讯的方法

C#教程 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

C#实现同Active MQ通讯的方法

本文实例讲述了C#实现同Active MQ通讯的方法。分享给大家供大家参考,具体如下:

内容概要:

主要以源码的形式介绍如何用C#实现同Active MQ 的通讯。本文假设你已经正确安装JDK1.6.x,了解Active MQ并有一定的编程基础。

正文:

JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS 程序格式的消息。
Message 由消息头,属性和消息体三部份组成。
Active MQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。

示例代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
using Apache.NMS;
using System.Diagnostics;
using Apache.NMS.Util;
using System.Windows.Threading;

namespace testActiveMQSubscriber
{
  /// 
  /// Interaction logic for Window1.xaml
  /// 
  public partial class Window1 : Window
  {
    private static IConnectionFactory connFac;
    private static IConnection connection;
    private static ISession session;
    private static IDestination destination;
    private static IMessageProducer producer;
    private static IMessageConsumer consumer;
    protected static ITextMessage message = null;
    public Window1()
    {
      InitializeComponent();
      initAMQ("MyFirstTopic");
    }
    private void initAMQ(String strTopicName)
    {
      try
      {
 connFac = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));
 //新建连接
 //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码
 //如果你要持久“订阅”,则需要设置ClientId,这样程序运行当中被停止,恢复运行时,能拿到没接收到的消息!
 connection.ClientId = "testing listener";
 connection = connFac.CreateConnection();//如果你是缺省方式启动Active MQ服务,则不需填用户名、密码
 //创建Session
 session = connection.CreateSession();
 //发布/订阅模式,适合一对多的情况
 destination = SessionUtil.GetDestination(session, "topic://" + strTopicName);
 //新建生产者对象
 producer = session.CreateProducer(destination);
 producer.DeliveryMode = MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留
 //新建消费者对象:普通“订阅”模式
 //consumer = session.CreateConsumer(destination);//不需要持久“订阅”
 //新建消费者对象:持久"订阅"模式:
 //  持久“订阅”后,如果你的程序被停止工作后,恢复运行,
 //从第一次持久订阅开始,没收到的消息还可以继续收
 consumer = session.CreateDurableConsumer(
   session.GetTopic(strTopicName)
   , connection.ClientId, null, false);
 //设置消息接收事件
 consumer.Listener += new MessageListener(OnMessage);
 //启动来自Active MQ的消息侦听
 connection.Start();
      }
      catch (Exception e)
      {
 //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!
 Debug.WriteLine(e.Message);
      }
    }
    private void SendMsg2Topic_Click(object sender, RoutedEventArgs e)
    {
      //发送消息
      ITextMessage request = session.CreateTextMessage(DateTime.Now.ToLocalTime()+" "+tbMsg.Text);
      producer.Send(request);
    }
    protected void onMessage(IMessage receivedMsg)
    {
      //接收消息
      message = receivedMsg as ITextMessage;
      //UI线程,显示收到的消息
      Dispatcher.Invoke(DispatcherPriority.Normal, new Action(() =>
      {
 DateTime dt = new DateTime();
 ListBoxItem lbi = new ListBoxItem();
 lbi.Content = DateTime.Now.ToLocalTime() + " " + message.Text;
 lbR.Items.Add(lbi);
      }));
    }
  }
}

队列通讯方式,消费者例子

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Apache.NMS;
using System.Diagnostics;
using log4net;
using Apache.NMS.Util;
using System.Collections;
namespace Cat8637AutoCallServer
{
  public class SMTask
  {
    public String Callee { get; set; }
    public String CheckNumber { get; set; }
    public int Deadline { get; set; }
    public override String ToString()
    {
      return String.Format("Callee={0},CheckNumber={1},Deadline={2}",
 Callee,CheckNumber,Deadline);
    }
  }
  
  public class MQClient
  {
    private static readonly ILog logger = LogManager.GetLogger(typeof(MQClient));
    private static IConnection connection = null;
    private static ISession session = null;
    Queue _voiceSMTasks = new Queue();
    public MQClient()
    {
      try
      {
 IConnectionFactory factory = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));
 //新建连接
 //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码
 connection = factory.CreateConnection();
 session = connection.CreateSession();
 IMessageConsumer consumer = session.CreateConsumer(session.GetQueue("TaskIssue_VoiceSM"));
 consumer.Listener += new MessageListener(OnMessage);
 connection.Start();
      }
      catch (Exception ex)
      {
 Debug.WriteLine(ex.Message);
      }
    }
    protected void onMessage(IMessage receivedMsg)
    {
      IMessage message = receivedMsg as ITextMessage;
      SMTask smTask = new SMTask();
      smTask.Callee = message.Properties["Callee"] as String;
      smTask.CheckNumber = message.Properties["Message"] as String;
      smTask.Deadline = Convert.ToInt32(message.Properties["deadline"] as String);
      logger.Info("Received: "+smTask.ToString());
      lock (_voiceSMTasks)
      {
 _voiceSMTasks.Enqueue(smTask);
      }
    }
    public SMTask GetVoiceSMTask()
    {
      SMTask result = null;
      lock (_voiceSMTasks)
      {
 if (_voiceSMTasks.Count > 0)
 {
   result = _voiceSMTasks.Dequeue() as SMTask;
 }
      }
      return result;
    }
  }
}

队列通讯方式,生产者例子

private void Send_Click(object sender, RoutedEventArgs e)
{
  try
  {
    IDestination destination = SessionUtil.GetDestination(session, "queue://TaskIssue_VoiceSM");
    //新建生产者对象
    IMessageProducer producer = session.CreateProducer(destination);
    producer.DeliveryMode = MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留
    ITextMessage request = session.CreateTextMessage();
    request.NMSCorrelationID = "TestVoiceSM";//这里我填了应用程序的名称。
    request.Properties["Callee"] = tbCallee.Text;
    request.Properties["Message"] = tbCheckNumber.Text;
    request.Properties["deadline"] = tbValidDuration.Text;
    producer.Send(request);
  }
  catch (Exception ex)
  {
    //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!
    Debug.WriteLine(ex.Message);
  }
}
private void Window_Closed(object sender, EventArgs e)
{
  try
  {
    if (session == null)
      return;
    //if (connection == null)
    //  return;
    session.Close();
    //connection.Close();
  }
  catch (Exception ex)
  {
    Debug.WriteLine(ex.Message);
  }
}

更多关于C#相关内容感兴趣的读者可查看本站专题:《C#窗体操作技巧汇总》、《C#常见控件用法教程》、《WinForm控件用法总结》、《C#程序设计之线程使用技巧总结》、《C#操作Excel技巧总结》、《C#中XML文件操作技巧汇总》、《C#数据结构与算法教程》、《C#数组操作技巧总结》及《C#面向对象程序设计入门教程》

希望本文所述对大家C#程序设计有所帮助。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/124115.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号