栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

JMS-从一个消费者到多个消费者

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

JMS-从一个消费者到多个消费者

如您所说,您有一些选择。

如果将其转换为主题以达到相同的效果,则需要使使用者成为永久使用者。如果您的消费者还活着,那么队列提供的一件事就是持久性。这将取决于您使用的MQ系统。

如果要坚持使用队列,则将为每个使用者和将在原始队列上侦听的调度程序创建一个队列。

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1        -> Queue_Consumer_2 <- Consumer_2        -> Queue_Consumer_3 <- Consumer_3

话题的优点

  • 动态添加新消费者更容易。所有消费者将无需任何工作即可收到新消息。
  • 您可以创建循环主题,以便Consumer_1会收到一条消息,然后是Consumer_2,然后是Consumer_3。
  • 可以向消费者推送新消息,而不必查询队列以使它们具有响应性。

主题的缺点

  • 除非您的代理支持此配置,否则消息不是持久的。如果使用者下线然后返回,则可能会丢失消息,除非设置了持久使用者。
  • 难以允许Consumer_1和Consumer_2接收消息,但不能接收Consumer_3。使用分派器和队列,分派器无法将消息放入Consumer_3的队列中。

队列的优点

  • 消息是持久的,直到使用者将其删除
  • 调度程序可以通过不将消息放入相应的使用者队列来过滤哪些使用者获得哪些消息。不过,这可以通过过滤器通过主题来完成。

队列的缺点

  • 需要创建其他队列以支持多个使用者。在动态环境中,这不会有效。

在开发消息系统时,我更喜欢主题,因为它给了我最大的权力,但是看到您已经在使用队列,就需要您更改系统实现主题的工作方式。

多用户队列系统的设计与实现

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1        -> Queue_Consumer_2 <- Consumer_2        -> Queue_Consumer_3 <- Consumer_3

资源

请记住,您还需要处理其他事情,例如问题异常处理,重新连接以及在丢失连接时重新排队等。这只是为了让您了解如何完成我的工作描述。

在实际系统中,我可能不会在第一个例外退出。我将允许系统继续以最佳状态运行并记录错误。如此代码所示,如​​果将消息放入单个使用者队列失败,则整个调度程序将停止。

分派器

package stackoverflow_4615895;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.QueueConnection;import javax.jms.QueueConnectionFactory;import javax.jms.QueueSession;import javax.jms.Session;public class Dispatcher {    private static long QUEUE_WAIT_TIME = 1000;    private boolean mStop = false;    private QueueConnectionFactory mFactory;    private String mSourceQueueName;    private String[] mConsumerQueueNames;        public Dispatcher(        QueueConnectionFactory factory,         String sourceQueue,         String[] consumerQueues) {        mFactory = factory;        mSourceQueueName = sourceQueue;        mConsumerQueueNames = consumerQueues;    }    public void start() {        Thread thread = new Thread(new Runnable() { public void run() {     Dispatcher.this.run(); }        });        thread.setName("Queue Dispatcher");        thread.start();    }    public void stop() {        mStop = true;    }    private void run() {        QueueConnection connection = null;        MessageProducer producer = null;        MessageConsumer consumer = null;        QueueSession session = null;        try { // Setup connection and queues for receiving the messages connection = mFactory.createQueueConnection(); session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE); Queue sourceQueue = session.createQueue(mSourceQueueName); consumer = session.createConsumer(sourceQueue); // Create a null producer allowing us to send messages // to any queue. producer = session.createProducer(null); // Create the destination queues based on the consumer names we // were given. Queue[] destinationQueues = new Queue[mConsumerQueueNames.length]; for (int index = 0; index < mConsumerQueueNames.length; ++index) {     destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]); } connection.start(); while (!mStop) {     // only wait QUEUE_WAIT_TIME in order to give     // the dispatcher a chance to see if it should     // quit     Message m = consumer.receive(QUEUE_WAIT_TIME);     if (m == null) {         continue;     }     // Take the message we received and put     // it in each of the consumers destination     // queues for them to process     for (Queue q : destinationQueues) {         producer.send(q, m);     } }        } catch (JMSException ex) { // Do wonderful things here         } finally { if (producer != null) {     try {         producer.close();     } catch (JMSException ex) {     } } if (consumer != null) {     try {         consumer.close();     } catch (JMSException ex) {     } } if (session != null) {     try {         session.close();     } catch (JMSException ex) {     } } if (connection != null) {     try {         connection.close();     } catch (JMSException ex) {     } }        }    }}

Main.java

    QueueConnectionFactory factory = ...;    Dispatcher dispatcher = new Dispatcher( factory, "Queue_Original", new String[]{     "Consumer_Queue_1",     "Consumer_Queue_2",     "Consumer_Queue_3"});    dispatcher.start();


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/497491.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号