依赖
1.发送消息到MQ队列 MQ连接工厂类:javax.jms jms 1.1 org.apache.activemq activemq-all 5.15.13 junit junit 4.13.2 test
通过该类的静态方法可以直接获取Connection
package com.lt.service;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
public class ActiveMQConnFactory {
static {
//连接信息可以在配置文件中取
ConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","failover:(tcp://127.0.0.1:61616)");
//构造从工厂得到连接对象
try {
conn = factory.createConnection();
} catch (JMSException e) {
e.printStackTrace();
}
}
private static Connection conn;
private ActiveMQConnFactory(){}
public static Connection getConnection() throws JMSException {
conn.start();
return conn;
}
}
消息发送者类:
构造方法;
发送文本消息方法;
发送map消息方法;
关闭session的方法;
package com.lt.service.producer;
import com.lt.service.ActiveMQConnFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import javax.jms.*;
//消息发送者
public class Producer {
//连接工厂,JMS用它创建连接
private Connection connection;
//一个发送或接收消息的线程
private Session session;
//MessageProducer: 消息发送者
private MessageProducer producer;
//队列,目的地
private Destination destination;
//构造器
public Producer() throws JMSException {
// 从工厂得到连接对象
connection = ActiveMQConnFactory.getConnection();
//获取操作连接
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
//得到消息生成者,即发送者,参数为queue
producer = session.createProducer(null);
//设置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
//发送text消息
public void sendMessage(String queue,String str){
try {
destination = session.createQueue(queue);
//声明发送消息的类型
TextMessage message = new ActiveMQTextMessage();
message.setText(str);
producer.send(destination,message);
close();
} catch (JMSException e) {
e.printStackTrace();
}
}
//发送map消息
public void sendMessage(String queue, ActiveMQMapMessage map) throws JMSException {
destination = session.createQueue(queue);
producer.send(destination,map);
close();
}
public void close() throws JMSException {
if(session != null){
session.close();
}
}
}
发送文本消息方法封装类:
对发送文本消息进行再次封装,这样就可以通过静态方法进行调用了。
package com.lt.service.producer;
import javax.jms.JMSException;
public class ActivemqSender {
//发送文本消息
public static void sendTextMessageage(String queue,String message){
try {
Producer producer = new Producer();
producer.sendMessage(queue, message);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
测试代码(测试类中):
@Test
public void sendMap() throws JMSException {
Producer producer = new Producer();
ActiveMQMapMessage mapMessage = new ActiveMQMapMessage();
mapMessage.setString("test1","test11");
mapMessage.setString("test2","test22");
mapMessage.setInt("test3",75);
producer.sendMessage("test",mapMessage);
}
@Test
public void sendText(){
ActivemqSender.sendTextMessageage("test","测试文本");
}
2.固定时间内接收一条消息
消息接收类:
package com.lt.service.consumer;
import com.lt.service.ActiveMQConnFactory;
import javax.jms.*;
//消息接收类
public class Consumer {
//Connection : JMS客户端到JMS Provider的连接
private Connection connection;
//Session : 一个发送或接收消息的线程
private Session session;
// Destination :消息目的地
private Destination destination;
//消费者,消息接受者
private MessageConsumer consumer;
public Consumer() throws JMSException {
//从工厂得到连接对象
connection = ActiveMQConnFactory.getConnection();
connection.start();
//获取连接操作
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
}
//接收消息(每次只接收一个) 可接收文本消息或Map,别的消息会报异常,可以自己进行扩展
public Object onMessage(String queue, long time) throws Exception {
destination = session.createQueue(queue);
consumer = session.createConsumer(destination);
//Receive the next message that within the specified timeout interval.
//接收下一条消息在一个明确的时间间隔内。(单位:毫秒)
Message receive = consumer.receive(time);
if(null == receive){
return null;
}
//此处注意,若队列中存储的消息类型不是TextMessage,也会接收消息,队列中该消息就会消失
if(receive instanceof TextMessage){
TextMessage textMessage = (TextMessage) receive;
if (null == textMessage){
return null;
}
close();
return textMessage.getText();
}else if(receive instanceof MapMessage){
MapMessage mapMessage = (MapMessage) receive;
if(null == mapMessage){
return null;
}
close();
return mapMessage;
} else{
throw new Exception("类型不匹配");
}
}
public void addListener(String queue,MessageListener listener){
try {
Destination destination = session.createQueue(queue);
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(listener);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void close() throws JMSException {
if(consumer != null){
consumer.close();
}
if(session != null){
session.close();
}
}
}
测试代码:
@Test
public void getMessage() throws Exception {
Consumer consumer = new Consumer();
//监听10秒,看看是否有消息进入队列,如果有,则进行消费
Object result = consumer.onMessage("test", 100000);
if(result instanceof String){
System.out.println(result);
}else if(result instanceof MapMessage){
MapMessage mapMessage = (MapMessage) result;
System.out.println(mapMessage.toString());
System.out.println(mapMessage.getObject("test1"));
}else{
System.out.println("队列中无消息");
}
}
3.监听MQ消息队列
监听类:
增加监听的方法,在2中含有,即:addListener方法
package com.lt.service.listener;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class TextListener implements MessageListener {
@Override
public void onMessage(Message message) {
if(null == message){
System.out.println("消息为空");
}else{
if(message instanceof TextMessage){
System.out.println(message);
}else if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage) message;
System.out.println(mapMessage.toString());
}else{
System.out.println("未知类型:" + message);
}
}
}
}
测试代码:
@Test
public void addListener() throws JMSException {
Consumer consumer = new Consumer();
consumer.addListener("test?consumer.prefetchSize=10",new TextListener());
//while(true) 是为了不关闭监听,测试使用
while (true){}
}



