栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ActiveMQ实现发送消息和异步接收消息代码示例

ActiveMQ实现发送消息和异步接收消息代码示例

ActiveMQ实现发送消息和接收消息Demo 1. 添加依赖

    org.apache.activemq
    activemq-all
    5.5.0

2. 代码示例
public class ActiveMQClient {
    
    private Logger logger = LoggerFactory.getLogger(ActiveMQClient.class);
    private String userName = "admin";
    private String password = "admin";
    private String brokerUrl = "127.0.0.1:61616";
    private Connection connection = null;
    private boolean connected = false;
    Destination destination = null;
    private MessageProducer producer = null;
    private MessageConsumer consumer = null;
    private Session session = null;

    
    public boolean connect() {
        ConnectionFactory connectionFactory;
        try {
            connectionFactory = new ActiveMQConnectionFactory(userName,password,"tcp://"+ brokerUrl);
            connection = connectionFactory.createConnection();
            connection.start();
            connected = true;
            logger.info("[ActiveMQ连接服务]服务连接成功");
            return connected;
        } catch (JMSException e) {
            e.printStackTrace();
            connected = false;
            logger.info("[ActiveMQ连接服务]服务连接失败");
            return connected;
        }
    }

    
    public boolean disconnect() {
        try {
            if(producer != null) {
                producer.close();
            }
            if(consumer != null) {
                consumer.close();
            }
            if(session != null) {
                session.close();
            }
            if(connection != null) {
                connection.close();
                connected = false;
            }
            logger.info("[ActiveMQ断开服务]服务断开成功");
            return true;
        } catch (JMSException e) {
            e.printStackTrace();
            logger.info("[ActiveMQ断开服务]服务断开失败,发生异常");
            return false;
        }
    }

    
    public boolean sendMessage(String topic, String message) {
        if(connection == null || !connected) {
            logger.info("[ActiveMQ发送消息]消息发送失败,服务尚未连接。");
            return false;
        }
        try {
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic(topic);
            producer = session.createProducer(destination);
            TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
            logger.info("[ActiveMQ发送消息]消息发送成功。主题:{},消息:{}", topic,message);
            return true;
        } catch (JMSException e) {
            e.printStackTrace();
            logger.info("[ActiveMQ发送消息]消息发送失败,发生异常。");
            return false;
        }
    }

    
    public boolean receiveMessage(String topic) {
        if(connection == null || !connected) {
            logger.info("[ActiveMQ订阅消息]消息订阅失败,服务尚未连接。");
            return false;
        }
        try {
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic(topic);
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(message -> {
                try {
                    if(message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        logger.info("[ActiveMQ接收消息]消息主题:{},消息:{}", topic,textMessage.getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                    logger.info("[ActiveMQ接收消息]失败,发生异常");
                }

            });
            logger.info("[ActiveMQ订阅消息]消息订阅成功。");
            return true;
        } catch (JMSException e) {
            e.printStackTrace();
            logger.info("[ActiveMQ订阅消息]失败,发生异常。");
            return false;
        }
    }
    
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/680893.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号