栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅
  • 一、在 ESB 中配置 ActiveMQ
  • 二、配置发布者
    • 1、在 ActiveMQ 创建主题
    • 2、在 ESB 中添加代理服务
  • 三、配置订阅者
    • 1、订阅者 1 配置
    • 2、订阅者 2 配置
  • 四、发布 ESB 代理服务
  • 五、测试消息发布与订阅
  • 六、示例代码
    • 1、主题消息发布者
    • 2、主题消息订阅者
      • ①、Consumer
      • ②、MessageListenerCallBack

在此示例中,在 ActiveMQ 中创建主题,然后在 WSO2 ESB 中添加充当发布者和订阅者的代理服务。

一、在 ESB 中配置 ActiveMQ

在 ESB 中配置 ActiveMQ

二、配置发布者 1、在 ActiveMQ 创建主题

在 ActiveMQ 创建名为 SimplePublishSubscribeService 的主题

2、在 ESB 中添加代理服务

添加一个名为 PublishSubscribe 的代理服务并将其配置为发布到主题 SimplePublishSubscribeService。 可以使用管理控制台将代理服务添加到 ESB,方法是在设计视图中构建代理服务,或者将 XML 配置复制到源视图中。 或者,可以将名为 PublishSubscribe.xml 的 XML 文件添加到 /repository/deployment/server/synapse-configs/default/proxy-services。 下面给出了定义代理服务的示例 XML 代码段。 请注意,地址 URI 指定了用于配置 JMS 传输的属性。

URI:jms:/主题名称?transport.jms.ConnectionFactoryJNDIName=JMS侦听器配置的jdni名称&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://ActiveMQ服务器地址:61616&transport.jms.DestinationType=topic



    
        
            

在 ESB 发布 PublishSubscribe 代理服务后,ESB 会在 ActiveMQ 创建名为 PublishSubscribe 的队列。

三、配置订阅者

接下来,配置两个订阅 JMS 主题 SimplePublishSubscribeService 的代理服务,以便每当该主题收到消息时,它就会发送到这些订阅代理服务。 以下是这些代理服务的示例配置。

1、订阅者 1 配置


    
        
            
            
            
                
            
            
                
            
            
                $1
                
                    
                
            
            
        
        
            
        
        
    
    topic
    SimplePublishSubscribeService
    
        
            contentType
            application/json
        
    
    myTopicConnectionFactory

指定主题消息的类型,json/xml/text 等。

特别特别注意!!!!!!transport.jms.ConnectionFactory 配置的是 JMS 传输侦听器中的 parameter 名字

ClassMediator

package com;

import org.apache.synapse.MessageContext; 
import org.apache.synapse.mediators.AbstractMediator;

public class SimplePublishSubscribeService1 extends AbstractMediator { 

	private String property_name;

	public String getProperty_name() {
		return property_name;
	}

	public void setProperty_name(String property_name) {
		this.property_name = property_name;
	}
	
	
	public boolean mediate(MessageContext context) { 
		// TODO Implement your mediation logic here 
		
		String topic_data = (String) context.getProperty(property_name);
		
		System.out.println("订阅者1接收到的数据:" + topic_data);
		
		context.setProperty("datainfos", topic_data);
		
		return true;
	}
}
2、订阅者 2 配置


    
        
            
            
                
            
            
        
        
            
        
        
    
    topic
    SimplePublishSubscribeService
    
        
            contentType
            application/json
        
    
    myTopicConnectionFactory

指定主题消息的类型,json/xml/text 等。

特别特别注意!!!!!!transport.jms.ConnectionFactory 配置的是 JMS 传输侦听器中的 parameter 名字

四、发布 ESB 代理服务

发布 SimplePublishSubscribeService1 和 SimplePublishSubscribeService2 代理服务。

订阅者发布成功!

五、测试消息发布与订阅

在本实例中,ESB 中创建了名为 PublishSubscribe 的代理服务,该服务是作用是监听 ActiveMQ 的 PublishSubscribe 队列,如果该队列有消息,则把消息发布到 ActiveMQ 中的 SimplePublishSubscribeService 主题,代理服务 SimplePublishSubscribeService1 和 SimplePublishSubscribeService1 订阅了主题 SimplePublishSubscribeService,如果收到主题发布的消息,则会打印出日志。
测试方法:向 ESB 的 PublishSubscribe 代理服务发送消息,或者向 ActiveMQ 的 PublishSubscribe 队列发送消息,或者向 ActiveMQ 的 SimplePublishSubscribeService 主题发布消息, SimplePublishSubscribeService1 和 SimplePublishSubscribeService1 都会收到所订阅主题的消息。

向 ActiveMQ 的 SimplePublishSubscribeService 主题发布消息

WSO2 ESB 收到订阅消息

六、示例代码

库文件 /activemq-all-x.x.x.jar

1、主题消息发布者
package topic.producer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.alibaba.fastjson.JSONObject;

public class TopicMsgProducer {

	private static final String DEFAULT_BROKER_HOST = "192.168.131.128";
	private static final String DEFAULT_BROKER_PORT = "61616";
	private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;

	private static final String USER_NAME = "user";
	private static final String PASSWORD = "user";

	private static final String TOPIC_NAME = "SimplePublishSubscribeService";

	public static void main(String[] args) {
		new TopicMsgProducer().send();
	}

	public void send() {
		// 创建连接工厂
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

		Connection conn = null;
		try {

			factory.setBrokerURL(defaultURL);
			factory.setUserName(USER_NAME);
			factory.setPassword(PASSWORD);

			// 创建连接
			conn = factory.createConnection();
			conn.start();
			// 创建会话
			Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 创建地点
			Topic topic = session.createTopic(TOPIC_NAME);
			// 创建生产者
			MessageProducer producer = session.createProducer(topic);
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);

			TextMessage tmsg = session.createTextMessage();
			
			JSONObject jsonObject = new JSONObject();
			jsonObject.put("text", "Hello, CalvinChan!");
			
			for(int i = 0; i < 10; i++) {
				tmsg.setText(jsonObject.toJSONString());
				producer.send(tmsg);
				System.out.println("发送的消息:" + tmsg.getText());
				
				Thread.sleep(2000);
			}
			
		} catch (JMSException | InterruptedException e) {
			e.printStackTrace();
		} finally {
			try {
				if (conn != null)
					conn.close();
			} catch (Throwable ignore) {
			}
		}
	}

}

2、主题消息订阅者 ①、Consumer
package topic.consumer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicMsgConsumer {

	private static final String DEFAULT_BROKER_HOST = "192.168.131.128";
	private static final String DEFAULT_BROKER_PORT = "61616";
	private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
	
	private static final String USER_NAME = "user";
	private static final String PASSWORD = "user";
	
	private static final String TOPIC_NAME = "SimplePublishSubscribeService";
	
	private static final String CLIENT_ID = "calvinchan";
	
	
	public static void main(String[] args) {
		// TODO 自动生成的方法存根
		new TopicMsgConsumer().receive();
	}

	public void receive() {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
		Connection conn = null;
		try {
			factory.setBrokerURL(defaultURL);
			factory.setUserName(USER_NAME);
			factory.setPassword(PASSWORD);
			
			conn = factory.createConnection();
			conn.setClientID(CLIENT_ID);
			conn.start();
			Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
			// 订阅发布模式的 Topic对象 不是Destination
			Topic topic = session.createTopic(TOPIC_NAME);
			TopicSubscriber subsriber = session.createDurableSubscriber(topic, CLIENT_ID);
			subsriber.setMessageListener(new MessageListenerCallBack(session));
			while (true);
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (conn != null) {
				try {
					conn.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

②、MessageListenerCallBack
package topic.consumer;

import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

public class MessageListenerCallBack implements MessageListener {

	private Session session;
	
	private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

	public MessageListenerCallBack(Session session) {
		this.session = session;
	}
	
	@Override
	public void onMessage(Message message) {
		// TODO 自动生成的方法存根
		TextMessage textMessage = (TextMessage) message;
		
		try {
			if (textMessage != null) { 
				// 接收到消息
				System.out.println("["+ simpleDateFormat.format(new Date()) + "] 接收到消息:" + textMessage.getText());
				message.acknowledge();
			} else {
				//没有接收到消息,通知producer重新发送
				this.session.recover();
			}
		} catch (JMSException e) {
			// TODO 自动生成的 catch 块
			e.printStackTrace();
		}
	}

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/888296.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号