如您所说,您有一些选择。
如果将其转换为主题以达到相同的效果,则需要使使用者成为永久使用者。如果您的消费者还活着,那么队列提供的一件事就是持久性。这将取决于您使用的MQ系统。
如果要坚持使用队列,则将为每个使用者和将在原始队列上侦听的调度程序创建一个队列。
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 -> Queue_Consumer_2 <- Consumer_2 -> Queue_Consumer_3 <- Consumer_3
话题的优点
- 动态添加新消费者更容易。所有消费者将无需任何工作即可收到新消息。
- 您可以创建循环主题,以便Consumer_1会收到一条消息,然后是Consumer_2,然后是Consumer_3。
- 可以向消费者推送新消息,而不必查询队列以使它们具有响应性。
主题的缺点
- 除非您的代理支持此配置,否则消息不是持久的。如果使用者下线然后返回,则可能会丢失消息,除非设置了持久使用者。
- 难以允许Consumer_1和Consumer_2接收消息,但不能接收Consumer_3。使用分派器和队列,分派器无法将消息放入Consumer_3的队列中。
队列的优点
- 消息是持久的,直到使用者将其删除
- 调度程序可以通过不将消息放入相应的使用者队列来过滤哪些使用者获得哪些消息。不过,这可以通过过滤器通过主题来完成。
队列的缺点
- 需要创建其他队列以支持多个使用者。在动态环境中,这不会有效。
在开发消息系统时,我更喜欢主题,因为它给了我最大的权力,但是看到您已经在使用队列,就需要您更改系统实现主题的工作方式。
多用户队列系统的设计与实现
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 -> Queue_Consumer_2 <- Consumer_2 -> Queue_Consumer_3 <- Consumer_3
资源
请记住,您还需要处理其他事情,例如问题异常处理,重新连接以及在丢失连接时重新排队等。这只是为了让您了解如何完成我的工作描述。
在实际系统中,我可能不会在第一个例外退出。我将允许系统继续以最佳状态运行并记录错误。如此代码所示,如果将消息放入单个使用者队列失败,则整个调度程序将停止。
分派器
package stackoverflow_4615895;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.QueueConnection;import javax.jms.QueueConnectionFactory;import javax.jms.QueueSession;import javax.jms.Session;public class Dispatcher { private static long QUEUE_WAIT_TIME = 1000; private boolean mStop = false; private QueueConnectionFactory mFactory; private String mSourceQueueName; private String[] mConsumerQueueNames; public Dispatcher( QueueConnectionFactory factory, String sourceQueue, String[] consumerQueues) { mFactory = factory; mSourceQueueName = sourceQueue; mConsumerQueueNames = consumerQueues; } public void start() { Thread thread = new Thread(new Runnable() { public void run() { Dispatcher.this.run(); } }); thread.setName("Queue Dispatcher"); thread.start(); } public void stop() { mStop = true; } private void run() { QueueConnection connection = null; MessageProducer producer = null; MessageConsumer consumer = null; QueueSession session = null; try { // Setup connection and queues for receiving the messages connection = mFactory.createQueueConnection(); session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE); Queue sourceQueue = session.createQueue(mSourceQueueName); consumer = session.createConsumer(sourceQueue); // Create a null producer allowing us to send messages // to any queue. producer = session.createProducer(null); // Create the destination queues based on the consumer names we // were given. Queue[] destinationQueues = new Queue[mConsumerQueueNames.length]; for (int index = 0; index < mConsumerQueueNames.length; ++index) { destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]); } connection.start(); while (!mStop) { // only wait QUEUE_WAIT_TIME in order to give // the dispatcher a chance to see if it should // quit Message m = consumer.receive(QUEUE_WAIT_TIME); if (m == null) { continue; } // Take the message we received and put // it in each of the consumers destination // queues for them to process for (Queue q : destinationQueues) { producer.send(q, m); } } } catch (JMSException ex) { // Do wonderful things here } finally { if (producer != null) { try { producer.close(); } catch (JMSException ex) { } } if (consumer != null) { try { consumer.close(); } catch (JMSException ex) { } } if (session != null) { try { session.close(); } catch (JMSException ex) { } } if (connection != null) { try { connection.close(); } catch (JMSException ex) { } } } }}Main.java
QueueConnectionFactory factory = ...; Dispatcher dispatcher = new Dispatcher( factory, "Queue_Original", new String[]{ "Consumer_Queue_1", "Consumer_Queue_2", "Consumer_Queue_3"}); dispatcher.start();


