栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ActiveMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ActiveMQ

ActiveMQ

官网地址:ActiveMQ (apache.org)

基础概念 JMS和AMQP

**JMS:**Java平台消息中间件 【消息生产者、queue队列、消息消费者】

AMQP:跨平台,【消息生产者、exchange交换机、queue队列、消息消费者】

JMS模型
  1. 点对点模型
  2. 发布、订阅模型
安装activemq 基本命令
#解压
tar -zxf apache-activemq-5.9.0-bin.tar.gz
#检查权限,如果权限不足,需要修改文件权限
ls -al apache-activemq-5.9.0/bin
chmod 755 activemq
#复制应用到本地目录
cp -r apache-activemq-5.9.0 /usr/local/activemq
#启动命令
/usr/local/activemq/bin/activemq start
#检查进程
ps aux | grep activemq
#访问
http://ip:8161/admin	username:admin	pwd:admin
#修改端口
cd /usr/local/activemq/conf/jetty.xml
关键字:jettyPort
#修改密码
vi /usr/local/activemq/conf/users.properties
内容 username:pwd
#重启
/usr/local/activemq/bin/activemq restart
#关闭
/usr/local/activemq/bin/activemq stop

#修改提供服务端口
activemq.xml 默认61616
术语
  1. destination目的地
  2. producer消息生成者
  3. consumer|receiver消息消费者
  4. message消息
Java中使用 基本使用

导入包:

		
		
			org.apache.activemq
			activemq-all
			5.9.0
		

点对点模型

producer

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
public class HelloWorldProducer {
	
	public void sendHelloWorldActiveMQ( Users msgTest){
		//定义链接工厂
		ConnectionFactory connectionFactory = null;
		//定义链接对象
		Connection connection = null;
		//定义会话
		Session session = null;
		//目的地
		Destination destination = null;
		//定义消息的发送者
		MessageProducer producer = null;
		//定义消息
		Message message = null;
		try{
			
			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.128:61616");
			//创建连接对象
			connection = connectionFactory.createConnection();
			//启动连接
			connection.start();
			
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
			destination = session.createQueue("helloworld-destination");
			//创建消息的生产者 
			producer = session.createProducer(destination);
			//传递消息
//			message = session.createTextMessage(msgTest);
			//传递消息对象
			message = session.createObjectMessage(msgTest);
			//发送消息
			producer.send(message);
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			//回收消息发送者资源
			if(producer != null){
				try {
					producer.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if(session != null){
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if(connection != null){
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
}
public static void main(String[] args) {
    HelloWorldProducer producer = new HelloWorldProducer();
    Users user = new Users();
    user.setAge("11");
    user.setName("dzy");
    producer.sendHelloWorldActiveMQ(user);
}

consumer(直接获取或者通过监听)

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
public class HelloWorldConsumer {
	
	public void readHelloWorldActiveMQ() {
		// 定义链接工厂
		ConnectionFactory connectionFactory = null;
		// 定义链接对象
		Connection connection = null;
		// 定义会话
		Session session = null;
		// 目的地
		Destination destination = null;
		// 定义消息的发送者
		MessageConsumer consumer = null;
		// 定义消息
		Message message = null;
		try {
			
			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.128:61616");
			// 创建连接对象
			connection = connectionFactory.createConnection();
			// 启动连接
			connection.start();
			
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
			destination = session.createQueue("helloworld-destination");
			// 创建消息的消费者
			consumer = session.createConsumer(destination);
            
            //第一种获取消息的方法
			// 创建消息对象
			message = consumer.receive();
			//处理消息
//			String msg = ((TextMessage)message).getText();
//			System.out.println("从ActiveMQ服务中获取的文本信息 "+msg);
			//处理对象消息
			Users msg1 = (Users)((ObjectMessage)message).getObject();
			System.out.println("返回的对象:"+msg1.toString());
            
            //第二种获取消息的方法(监听)
            consumer.setMessageListener(new MessageListener() {	
				@Override
				public void onMessage(Message message) {
					//处理消息
					String msg = null;
					try {
						msg = ((TextMessage)message).getText();
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					System.out.println("从ActiveMQ服务中获取的文本信息 "+msg);
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 回收消息发送者资源
			if (consumer != null) {
				try {
					consumer.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if (session != null) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
}
    
public class Test{
	public static void main(String[] args) {
		HelloWorldConsumer consumer = new HelloWorldConsumer();
		consumer.readHelloWorldActiveMQ();
    }
}
订阅发布模型

producer

public class HelloWorldProducerTopic {

	
	public void sendHelloWorldActiveMQ(String msgTest){
		//定义链接工厂
		ConnectionFactory connectionFactory = null;
		//定义链接对象
		Connection connection = null;
		//定义会话
		Session session = null;
		//目的地
		Destination destination = null;
		//定义消息的发送者
		MessageProducer producer = null;
		//定义消息
		Message message = null;
		try{
			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.128:61616");
			//创建连接对象
			connection = connectionFactory.createConnection();
			//启动连接
			connection.start();
			session = connection.createSession(false, 
			Session.AUTO_ACKNOWLEDGE);
			//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
			destination = session.createTopic("test-topic");
			//创建消息的生产者
			producer = session.createProducer(destination);
			//创建消息对象
			message = session.createTextMessage(msgTest);
			//发送消息
			producer.send(message);
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			//回收消息发送者资源
			if(producer != null){
			try {
				producer.close();
			} catch (JMSException e) {
			// TODO Auto-generated catch block
				e.printStackTrace();
			}
			}
			if(session != null){
			try {
				session.close();
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			if(connection != null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}

			}
		}
	}
}
public class Test {
	public static void main(String[] args) {
		HelloWorldProducerTopic topic = new HelloWorldProducerTopic();
		Users user = new Users();
		user.setAge("11");
		user.setName("dzy");
		topic.sendHelloWorldActiveMQ("tset");
	}
}

consumer

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloWorldConsumerTopic implements Runnable{
	
	public void readHelloWorldActiveMQ() {
		// 定义链接工厂
		ConnectionFactory connectionFactory = null;
		// 定义链接对象
		Connection connection = null;
		// 定义会话
		Session session = null;
		// 目的地
		Destination destination = null;
		// 定义消息的发送者
		MessageConsumer consumer = null;
		// 定义消息
		Message message = null;
		try {
			
			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.128:61616");
			// 创建连接对象
			connection = connectionFactory.createConnection();
			// 启动连接
			connection.start();
			
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
			destination = session.createTopic("test-topic");
			// 创建消息的消费者
			consumer = session.createConsumer(destination);
			consumer.setMessageListener(new MessageListener() {
				@Override
				public void onMessage(Message message) {
					String msg = null;
					try {
						msg = ((TextMessage)message).getText();
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					System.out.println("从ActiveMQ服务中获取的文本信息 "+msg);
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		} 
	}

	@Override
	public void run() {
		this.readHelloWorldActiveMQ();
	}
}

public class test{
    public static void main(String[] args) {
		HelloWorldConsumerTopic topic = new HelloWorldConsumerTopic();
		Thread t = new Thread(topic);
		t.start();
		HelloWorldConsumerTopic3 topic1 = new HelloWorldConsumerTopic3();
		Thread t1 = new Thread(topic1);
		t1.start();
		HelloWorldConsumerTopic2 topic2 = new HelloWorldConsumerTopic2();
		Thread t2 = new Thread(topic2);
		t2.start();
	}
}
整合spring producer pom文件
	
		
		
			org.apache.activemq
			activemq-all
		
		
		
			org.apache.xbean
			xbean-spring
		
		
		
			org.springframework
			spring-jms
		
		
			org.apache.activemq
			activemq-pool
		
		
			org.apache.activemq
			activemq-jms-pool
		
		
		
			junit
			junit
		
		
		
			org.slf4j
			slf4j-log4j12
		
		
		
			org.springframework
			spring-context
		
		
			org.springframework
			spring-beans
		
		
			org.springframework
			spring-webmvc
		

		
		
			jstl
			jstl
		
		
			javax.servlet
			servlet-api
			provided
		
		
			javax.servlet
			jsp-api
			provided
		
	
consumer pom文件
	
		
		
			org.apache.activemq
			activemq-all
		
		
		
			org.springframework
			spring-jms
		
		
		
			org.apache.xbean
			xbean-spring
		


		
			org.springframework
			spring-context
		
		
			org.springframework
			spring-beans
		
	
producer applicationContext-jms.xml


    
	
	
	
		
	
	
		
		
	
    
	
	
	
	
		
		
	

	
	
		
		
		
		
	

consumer applicationContext-jms.xml


	
	
	

	
	
	
		
		
	
	
	
	
	
		
		
	
	

consumer消息监听处理
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.bjsxt.pojo.Users;
import com.bjsxt.service.UserService;

@Component(value="myListener")
public class MyMessageListener implements MessageListener{
	@Autowired
	private UserService userService;
	@Override
	public void onMessage(Message message) {
		//activemq消费者对象
		ObjectMessage objMessage = (ObjectMessage)message;
		Users user=null;
		try {
			user = (Users)objMessage.getObject();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		this.userService.showUser(user);
	}
}
producer消息发送
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import com.bjsxt.pojo.Users;
import com.bjsxt.service.UserService;

@Service
public class UserServiceImpl implements UserService {
	@Autowired
	private JmsTemplate jmsTemplate;
	@Override
	public void addUser(final Users user) {
		this.jmsTemplate.send(new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				Message message = session.createObjectMessage(user);
				return message;
			}
		});
	}
}
end
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/855100.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号