本例使用docker pull的activemq的镜像,并没有安装,
安装完成之后通过8161端口访问,输入用户名密码(admin),即可访问activemq的管理界面
image.png
2.新建一个maven项目这是一个ssm项目。pom如下
1.非整合spring的单机版1.queue形式4.0.0 jk.zmn spring-activemqwar 0.0.1-SNAPSHOT spring-activemq Maven Webapp http://maven.apache.org 4.0.5.RELEASE 3.2.1 1.6.6 1.2.12 5.1.35 2.8.8 5.11.2org.springframework spring-core${spring.version} org.springframework spring-context${spring.version} org.springframework spring-context-support${spring.version} org.springframework spring-aop${spring.version} org.springframework spring-aspects${spring.version} org.springframework spring-tx${spring.version} org.springframework spring-jdbc${spring.version} org.springframework spring-web${spring.version} org.springframework spring-test${spring.version} test org.springframework spring-webmvc${spring.version} org.springframework spring-web${spring.version} mysql mysql-connector-java${mysql.version} com.alibaba druid0.2.23 log4j log4j${log4j.version} org.slf4j slf4j-api${slf4j.version} ch.qos.logback logback-classic1.1.2 ch.qos.logback logback-core1.1.2 org.logback-extensions logback-ext-spring0.1.1 org.mybatis mybatis${mybatis.version} org.mybatis mybatis-spring1.2.0 javax.servlet javax.servlet-api3.0.1 javax.servlet.jsp javax.servlet.jsp-api2.3.2-b01 javax.servlet jstl1.2 junit junit3.8.1 com.github.pagehelper pagehelper4.1.4 com.fasterxml.jackson.core jackson-core${jackjson.version} com.fasterxml.jackson.core jackson-annotations${jackjson.version} com.fasterxml.jackson.core jackson-databind${jackjson.version} org.apache.activemq activemq-all${activemq.version} org.springframework spring-jms${spring.version} org.springframework spring-context-support${spring.version} spring-activemq
内容提供者
@Test
public void testQueueMqProducter() throws Exception{ //1.创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616"); //2.创建链接
Connection connection = factory.createConnection(); //3.开启连接
connection.start(); //4.创建一个session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目的地
Queue testQueue = session.createQueue("testQueue"); //6.创建一个内容提供者
MessageProducer producter = session.createProducer(testQueue); //7.发送消息
TextMessage QMessage = new ActiveMQTextMessage();
QMessage.setText("我是队列信息,要减库存了");
producter.send(QMessage); //8.关闭资源
session.close();
connection.close();
}消费者
@Test
public void testQueueMqConsumer() throws Exception{ //1.创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616"); //2.创建链接
Connection connection = factory.createConnection(); //3.开启连接
connection.start(); //4.创建一个session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目的地
Queue testQueue = session.createQueue("testQueue"); //6.创建一个消费者
MessageConsumer consumer = session.createConsumer(testQueue); //7.发送消息
TextMessage receive = (TextMessage) consumer.receive();
System.out.println(receive.getText()); //8.关闭资源
session.close();
connection.close();
}效果如下
image.png
2. 发布订阅模式发布者
@Test
public void testTopicMqProducter() throws Exception{ //1.创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616"); //2.创建链接
Connection connection = factory.createConnection(); //3.开启连接
connection.start(); //4.创建一个session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目的地
Topic createTopic = session.createTopic("testTopic"); //6.创建一个内容提供者
MessageProducer producter = session.createProducer(createTopic); //7.发送消息
TextMessage QMessage = new ActiveMQTextMessage();
QMessage.setText("我是发布信息,张三抢到了手机11,rsad");
producter.send(QMessage); //8.关闭资源
session.close();
connection.close();
}订阅者
@Test
public void testTopicMqConsumer() throws Exception{ //1.创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616"); //2.创建链接
Connection connection = factory.createConnection(); //3.开启连接
connection.start(); //4.创建一个session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目的地
Topic createTopic = session.createTopic("testTopic"); //6.创建一个内容提供者
MessageConsumer consumer = session.createConsumer(createTopic);
//获取数据
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage message2 = (TextMessage) message; try {
System.out.println(message2.getText());
} catch (JMSException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
});
System.out.println("消费者1启动");
System.in.read(); //8.关闭资源
session.close();
connection.close();
}
@Test
public void testTopicMqConsumer2() throws Exception{ //1.创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616"); //2.创建链接
Connection connection = factory.createConnection(); //3.开启连接
connection.start(); //4.创建一个session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目的地
Topic createTopic = session.createTopic("testTopic"); //6.创建一个内容提供者
MessageConsumer consumer = session.createConsumer(createTopic);
//获取数据
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage message2 = (TextMessage) message; try {
System.out.println(message2.getText());
} catch (JMSException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
});
System.out.println("消费者2启动");
System.in.read(); //8.关闭资源
session.close();
connection.close();
}效果如下
image.png
2 整合springspring-activemq.xml
queue模式spring-queue
内容提供者
@Test
public void testSpringActiveMqProducter() {
ApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
JmsTemplate jmsTemplate = classPathXmlApplicationContext.getBean(JmsTemplate.class);
Destination destination = (Destination) classPathXmlApplicationContext.getBean("queueDestination");
jmsTemplate.send(destination,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("生意来了,张三购买商品");
}
});
}这个消费者,要实现messagelistener的接口
public class MyMessageListener implements MessageListener{ @Override
public void onMessage(Message message) {
TextMessage message2 = (TextMessage) message; try {
System.out.println(message2.getText());
} catch (JMSException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
}发布订阅模式发布者
@Test
public void testSpringActiveMqTopicProducter() {
ApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
JmsTemplate jmsTemplate = classPathXmlApplicationContext.getBean(JmsTemplate.class);
Destination destination = (Destination) classPathXmlApplicationContext.getBean("topicDestination");
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("又来生意啦,李四要购买手机");
}
});
}订阅者
public class MyTopicMessageListener implements MessageListener{ @Override
public void onMessage(Message message) { try {
TextMessage message2 = (TextMessage) message;
System.out.println(message2.getText());
System.out.println("我去减库存");
} catch (JMSException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
}public class MyTopicMessageListener2 implements MessageListener{ @Override
public void onMessage(Message message) { try {
TextMessage message2 = (TextMessage) message;
System.out.println(message2.getText());
System.out.println("我去生成订单");
} catch (JMSException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
}代码太乱,实验的话,请到码云下载,
作者:z七夜
链接:https://www.jianshu.com/p/06e306926c89



