栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费
  • 一、在 ESB 中配置 ActiveMQ
    • 1、复制库文件
      • ①、ESB 配置
      • ②、ActiveMQ 配置
    • 2、在 ESB 中配置 JMS 传输侦听器和发送器
      • ①、设置 JMS 侦听器
      • ②、设置 JMS 发送器
  • 二、向 JMS 队列发送消息
    • 1、创建消息中介构件
    • 2、REST API 配置文件
    • 3、AddressEndpoint 配置
    • 4、测试配置
  • 三、监听 JMS 队列中的消息
    • 1、创建消息中介构件
    • 2、代理服务配置文件
    • 3、测试配置
  • 四、示例程序
    • 1、消息生产者
    • 2、消息消费者
      • ①、Consumer
      • ②、MessageListenerCallBack

        WSO2 ESB 的 Java 消息服务 (JMS) 传输可以轻松地向任何实现 JMS 规范的 JMS 服务的队列和主题发送和接收消息。

        JMS 传输实现来自 WS-Commons Transports 项目,它利用 JNDI 连接到各种 JMS 代理。 因此,WSO2 ESB 可以与任何提供 JNDI 支持的 JMS 代理一起工作。 所有相关类都打包到axis2-transport-jms-.jar 中,org.apache.axis2.transport.jms.JMSListener 和org.apache.axis2.transport.jms.JMSSender 类分别充当传输接收器和发送器。

        JMS 传输实现需要一个活动的 JMS 服务器实例才能接收和发送消息。 建议使用 WSO2 Message Broker 或 Apache ActiveMQ,但也支持其他,例如 Apache Qpid 和 Tibco。

一、在 ESB 中配置 ActiveMQ 1、复制库文件 ①、ESB 配置

将以下库文件从 /lib 目录复制到 /repository/components/lib 目录。

ActiveMQ 5.8.0 及以上

  • activemq-broker-5.8.0.jar
  • activemq-client-5.8.0.jar
  • activemq-kahadb-store-5.8.0.jar
  • geronimo-jms_1.1_spec-1.1.1.jar
  • geronimo-j2ee-management_1.1_spec-1.0.1.jar
  • geronimo-jta_1.0.1B_spec-1.0.1.jar
  • hawtbuf-1.9.jar
  • Slf4j-api-1.6.6.jar
  • activeio-core-3.1.4.jar(在 /lib/optional 文件夹中)

低版本的 ActiveMQ

  • activemq-core-5.5.1.jar
  • geronimo-j2ee-management_1.0_spec-1.0.jar
  • geronimo-jms_1.1_spec-1.1.1.jar

ActiveMQ 应该在启动 ESB 之前启动并运行

②、ActiveMQ 配置

修改/conf/jetty.xml配置文件,不修改的话除本机外无法访问管理控制台,找到 jettyPortbean,将 host 属性从原来的 127.0.0.1 改为 0.0.0.0,修改完成后保存。

2、在 ESB 中配置 JMS 传输侦听器和发送器 ①、设置 JMS 侦听器

要启用 JMS 传输侦听器,请取消注释 /repository/conf/axis2/axis2.xml 文件中与 ActiveMQ 相关的以下侦听器配置。



    
        org.apache.activemq.jndi.ActiveMQInitialContextFactory
        tcp://ActiveMQ服务器地址:61616
        TopicConnectionFactory
        topic
    
    
        org.apache.activemq.jndi.ActiveMQInitialContextFactory
        tcp://ActiveMQ服务器地址:61616
        QueueConnectionFactory
        queue
    
    
        org.apache.activemq.jndi.ActiveMQInitialContextFactory
        tcp://ActiveMQ服务器地址:61616
        QueueConnectionFactory
        queue
    

②、设置 JMS 发送器

        要启用 JMS 传输发送器,请取消注释 /repository/conf/axis2/axis2.xml 文件中的以下配置。

        上述配置并没有解决 ActiveMQ 消息代理的瞬时故障问题。假设由于某种原因 ActiveMQ 宕机了一段时间后又恢复了。 ESB 不会重新连接到 ActiveMQ,而是在向 ESB 发送请求时抛出一些错误,直到重新启动。 为了解决这个问题,需要在 java.naming.provider.url 的位置添加以下配置:failover:tcp://localhost:61616

        这只会确保发生重新连接。

二、向 JMS 队列发送消息

        HTTP 客户端向 ESB 代理服务发送请求,代理服务会把 HTTP 客户端的请求体转发到 ActiveMQ 中指定的队列。(注意,该 HTTP 请求是没有响应消息的)

1、创建消息中介构件


2、REST API 配置文件

        在 inSequence 中,该 OUT_ONLY 属性设置为 true 以指示消息交换是单向的。



    
        
            
            
                
                    
3、AddressEndpoint 配置

URI:jms:/队列名称?transport.jms.ConnectionFactoryJNDIName=JMS侦听器配置的jdni名称&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://ActiveMQ服务器地址:61616&transport.jms.DestinationType=queue

4、测试配置

使用 postman 向 api 发送消息,注意,这个是没有响应消息的!!!

在 ActiveMQ 管理控制台可以看到待消费的消息

消费消息

三、监听 JMS 队列中的消息

        ESB 在 ActiveMQ 中创建一个和代理服务同名的队列,同时监听该队列中的消息,如果有生产者向该队列生产消息,ESB 会把队列中的消息转发到代理服务指定的 URL。

1、创建消息中介构件


2、代理服务配置文件

OUT_ONLY 属性设置为 true 以指示消息交换是单向的。

将传输设置为 jms 来使代理服务成为 JMS 侦听器。 一旦为代理服务启用了 JMS 传输,ESB 就会开始侦听与代理服务同名的 JMS 队列。

此示例配置,ESB 会侦听名为 JMSConsumerProxy 的 JMS 队列。 要使代理服务侦听不同的 JMS 队列,请使用目标队列的名称定义 transport.jms.Destination 参数。

在此示例配置, ActiveMQ生产的消息将被发送到后端服务,ESB 不会等待服务的响应。

发布代理服务后,ESB 会在 ActiveMQ 中创建一个和代理服务同名的队列



    
        
            
3、测试配置



ActiveMQ 控制台可以看到被监听的队列

四、示例程序 1、消息生产者

库文件 /activemq-all-x.x.x.jar

package queue.producer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

	private static final String DEFAULT_BROKER_HOST = "ActiveMQ服务器地址";
	private static final String DEFAULT_BROKER_PORT = "61616";
	private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
	
	private static final String USER_NAME = "user";
	private static final String PASSWORD = "user";

	private static final String QUEUE_NAME = "PublishSubscribe";

	public static void main(String[] args) throws Exception {

		// 连接工厂
		// 使用默认用户名、密码、路径
		// 因为:底层实现:final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" +
		// DEFAULT_BROKER_PORT;
		// 所以:路径 tcp://host:61616
		// 1 创建连接工厂
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
		
		connectionFactory.setBrokerURL(defaultURL);
		connectionFactory.setUserName(USER_NAME);
		connectionFactory.setPassword(PASSWORD);
		
		// 2 创建连接
		Connection connection = connectionFactory.createConnection();
		// 3 打开连接
		connection.start();
		// 4 创建会话
		// 第一个参数:是否开启事务
		// 第二个参数:消息是否自动确认
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 创建队列
		Queue queue = session.createQueue(QUEUE_NAME);
		// 5 创建生产者
		MessageProducer producer = session.createProducer(queue);
		
		
		for (int i = 0; i < 10; i++) {
			// 6 创建消息
			Message message = session.createTextMessage("{rn"
					+ "    "TEST": "发布订阅消息"rn"
					+ "}");
			producer.send(message);
			
			Thread.sleep(2000);
		}
		// 8 关闭消息
		//session.commit();
		producer.close();
		session.close();
		connection.close();
		System.out.println("消息生产成功");
	}
}
2、消息消费者 ①、Consumer
package queue.consumer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

	private static final String DEFAULT_BROKER_HOST = "ActiveMQ服务器地址";
	private static final String DEFAULT_BROKER_PORT = "61616";
	private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
	
	private static final String USER_NAME = "user";
	private static final String PASSWORD = "user";
	
	private static final String QUEUE_NAME = "PublishSubscribe";

	public static void main(String[] args) throws Exception {

		// 创建连接工厂
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
		
		connectionFactory.setBrokerURL(defaultURL);
		connectionFactory.setUserName(USER_NAME);
		connectionFactory.setPassword(PASSWORD);
		
		// 创建连接
		Connection connection = connectionFactory.createConnection();
		// 开启连接
		connection.start();
		// 创建会话
		
		Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		// 创建队列
		Queue queue = session.createQueue(QUEUE_NAME);
		// 创建消费者
		MessageConsumer consumer = session.createConsumer(queue);
		try {
			consumer.setMessageListener(new MessageListenerCallBack(session));
		} catch (Exception e) {
			// TODO: handle exception
			// 关闭连接
			session.close();
			connection.close();
			System.out.println("消费结束0");
			e.printStackTrace();
		}
	}
}

②、MessageListenerCallBack
package queue.consumer;

import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.*;

public class MessageListenerCallBack implements MessageListener {

	private Session session;
	
	private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

	public MessageListenerCallBack(Session session) {
		this.session = session;
	}

	@Override
	public void onMessage(Message message) {
		// TODO 自动生成的方法存根
		TextMessage textMessage = (TextMessage) message;
		try {
			if (textMessage != null) { 
				// 接收到消息
				System.out.println("["+ simpleDateFormat.format(new Date()) + "] 接收到消息:" + textMessage.getText());
				message.acknowledge();
			} else {
				//没有接收到消息,通知producer重新发送
				this.session.recover();
			}
		} catch (JMSException e) {
			// TODO 自动生成的 catch 块
			e.printStackTrace();
		}
	}

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/885485.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号