- 一、在 ESB 中配置 ActiveMQ
- 1、复制库文件
- ①、ESB 配置
- ②、ActiveMQ 配置
- 2、在 ESB 中配置 JMS 传输侦听器和发送器
- ①、设置 JMS 侦听器
- ②、设置 JMS 发送器
- 二、向 JMS 队列发送消息
- 1、创建消息中介构件
- 2、REST API 配置文件
- 3、AddressEndpoint 配置
- 4、测试配置
- 三、监听 JMS 队列中的消息
- 1、创建消息中介构件
- 2、代理服务配置文件
- 3、测试配置
- 四、示例程序
- 1、消息生产者
- 2、消息消费者
- ①、Consumer
- ②、MessageListenerCallBack
WSO2 ESB 的 Java 消息服务 (JMS) 传输可以轻松地向任何实现 JMS 规范的 JMS 服务的队列和主题发送和接收消息。
JMS 传输实现来自 WS-Commons Transports 项目,它利用 JNDI 连接到各种 JMS 代理。 因此,WSO2 ESB 可以与任何提供 JNDI 支持的 JMS 代理一起工作。 所有相关类都打包到axis2-transport-jms-
JMS 传输实现需要一个活动的 JMS 服务器实例才能接收和发送消息。 建议使用 WSO2 Message Broker 或 Apache ActiveMQ,但也支持其他,例如 Apache Qpid 和 Tibco。
一、在 ESB 中配置 ActiveMQ 1、复制库文件 ①、ESB 配置将以下库文件从 /lib 目录复制到
ActiveMQ 5.8.0 及以上
- activemq-broker-5.8.0.jar
- activemq-client-5.8.0.jar
- activemq-kahadb-store-5.8.0.jar
- geronimo-jms_1.1_spec-1.1.1.jar
- geronimo-j2ee-management_1.1_spec-1.0.1.jar
- geronimo-jta_1.0.1B_spec-1.0.1.jar
- hawtbuf-1.9.jar
- Slf4j-api-1.6.6.jar
- activeio-core-3.1.4.jar(在 /lib/optional 文件夹中)
低版本的 ActiveMQ
- activemq-core-5.5.1.jar
- geronimo-j2ee-management_1.0_spec-1.0.jar
- geronimo-jms_1.1_spec-1.1.1.jar
ActiveMQ 应该在启动 ESB 之前启动并运行
②、ActiveMQ 配置修改/conf/jetty.xml配置文件,不修改的话除本机外无法访问管理控制台,找到 jettyPortbean,将 host 属性从原来的 127.0.0.1 改为 0.0.0.0,修改完成后保存。
要启用 JMS 传输侦听器,请取消注释
②、设置 JMS 发送器org.apache.activemq.jndi.ActiveMQInitialContextFactory tcp://ActiveMQ服务器地址:61616 TopicConnectionFactory topic org.apache.activemq.jndi.ActiveMQInitialContextFactory tcp://ActiveMQ服务器地址:61616 QueueConnectionFactory queue org.apache.activemq.jndi.ActiveMQInitialContextFactory tcp://ActiveMQ服务器地址:61616 QueueConnectionFactory queue
要启用 JMS 传输发送器,请取消注释
上述配置并没有解决 ActiveMQ 消息代理的瞬时故障问题。假设由于某种原因 ActiveMQ 宕机了一段时间后又恢复了。 ESB 不会重新连接到 ActiveMQ,而是在向 ESB 发送请求时抛出一些错误,直到重新启动。 为了解决这个问题,需要在 java.naming.provider.url 的位置添加以下配置:failover:tcp://localhost:61616
这只会确保发生重新连接。
HTTP 客户端向 ESB 代理服务发送请求,代理服务会把 HTTP 客户端的请求体转发到 ActiveMQ 中指定的队列。(注意,该 HTTP 请求是没有响应消息的)
在 inSequence 中,该 OUT_ONLY 属性设置为 true 以指示消息交换是单向的。
3、AddressEndpoint 配置
URI:jms:/队列名称?transport.jms.ConnectionFactoryJNDIName=JMS侦听器配置的jdni名称&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://ActiveMQ服务器地址:61616&transport.jms.DestinationType=queue
使用 postman 向 api 发送消息,注意,这个是没有响应消息的!!!
在 ActiveMQ 管理控制台可以看到待消费的消息
消费消息
ESB 在 ActiveMQ 中创建一个和代理服务同名的队列,同时监听该队列中的消息,如果有生产者向该队列生产消息,ESB 会把队列中的消息转发到代理服务指定的 URL。
OUT_ONLY 属性设置为 true 以指示消息交换是单向的。
将传输设置为 jms 来使代理服务成为 JMS 侦听器。 一旦为代理服务启用了 JMS 传输,ESB 就会开始侦听与代理服务同名的 JMS 队列。
此示例配置,ESB 会侦听名为 JMSConsumerProxy 的 JMS 队列。 要使代理服务侦听不同的 JMS 队列,请使用目标队列的名称定义 transport.jms.Destination 参数。
在此示例配置, ActiveMQ生产的消息将被发送到后端服务,ESB 不会等待服务的响应。
发布代理服务后,ESB 会在 ActiveMQ 中创建一个和代理服务同名的队列
3、测试配置
ActiveMQ 控制台可以看到被监听的队列
库文件 /activemq-all-x.x.x.jar
package queue.producer;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
private static final String DEFAULT_BROKER_HOST = "ActiveMQ服务器地址";
private static final String DEFAULT_BROKER_PORT = "61616";
private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
private static final String USER_NAME = "user";
private static final String PASSWORD = "user";
private static final String QUEUE_NAME = "PublishSubscribe";
public static void main(String[] args) throws Exception {
// 连接工厂
// 使用默认用户名、密码、路径
// 因为:底层实现:final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" +
// DEFAULT_BROKER_PORT;
// 所以:路径 tcp://host:61616
// 1 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(defaultURL);
connectionFactory.setUserName(USER_NAME);
connectionFactory.setPassword(PASSWORD);
// 2 创建连接
Connection connection = connectionFactory.createConnection();
// 3 打开连接
connection.start();
// 4 创建会话
// 第一个参数:是否开启事务
// 第二个参数:消息是否自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue(QUEUE_NAME);
// 5 创建生产者
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
// 6 创建消息
Message message = session.createTextMessage("{rn"
+ " "TEST": "发布订阅消息"rn"
+ "}");
producer.send(message);
Thread.sleep(2000);
}
// 8 关闭消息
//session.commit();
producer.close();
session.close();
connection.close();
System.out.println("消息生产成功");
}
}
2、消息消费者
①、Consumer
package queue.consumer;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
private static final String DEFAULT_BROKER_HOST = "ActiveMQ服务器地址";
private static final String DEFAULT_BROKER_PORT = "61616";
private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
private static final String USER_NAME = "user";
private static final String PASSWORD = "user";
private static final String QUEUE_NAME = "PublishSubscribe";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(defaultURL);
connectionFactory.setUserName(USER_NAME);
connectionFactory.setPassword(PASSWORD);
// 创建连接
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消费者
MessageConsumer consumer = session.createConsumer(queue);
try {
consumer.setMessageListener(new MessageListenerCallBack(session));
} catch (Exception e) {
// TODO: handle exception
// 关闭连接
session.close();
connection.close();
System.out.println("消费结束0");
e.printStackTrace();
}
}
}
②、MessageListenerCallBack
package queue.consumer;
import java.text.SimpleDateFormat;
import java.util.Date;
import javax.jms.*;
public class MessageListenerCallBack implements MessageListener {
private Session session;
private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
public MessageListenerCallBack(Session session) {
this.session = session;
}
@Override
public void onMessage(Message message) {
// TODO 自动生成的方法存根
TextMessage textMessage = (TextMessage) message;
try {
if (textMessage != null) {
// 接收到消息
System.out.println("["+ simpleDateFormat.format(new Date()) + "] 接收到消息:" + textMessage.getText());
message.acknowledge();
} else {
//没有接收到消息,通知producer重新发送
this.session.recover();
}
} catch (JMSException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
}



