栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ActiveMQ API编程方式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ActiveMQ API编程方式

生产者:

public class Producer {
    //默认连接用户名
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    public static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;     //连接工厂
        Connection connection = null;       //连接
        Session session = null;             //会话
        Destination destination = null;     //消息目的地
        MessageProducer messageProducer;    //消息生产者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
        try {
            //通过连接工作获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建会话,第一个参数表示是否使用事务,第二个参数表示消息的确认模式
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个名为DemoActiveMQ消息队列
            destination = session.createTopic("DemoActiveMQ");
            //创建消息生产者
            messageProducer = session.createProducer(destination);

            //发送消息
            for (int i = 0; i < 3; i++) {
                String msg = "发送第" + i + "条消息";
                TextMessage textMessage = session.createTextMessage(msg);
                messageProducer.send(textMessage);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者同步接收消息

public class Consumer {

    //默认连接用户名
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    public static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;     //连接工厂
        Connection connection = null;       //连接
        Session session = null;             //会话
        Destination destination = null;     //消息目的地
        MessageConsumer messageConsumer;    //消息消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
        try {
            //通过连接工作获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建会话,第一个参数表示是否使用事务,第二个参数表示消息的确认模式
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个名为DemoActiveMQ消息队列
            destination = session.createTopic("DemoActiveMQ");
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            //同步接受消息
            Message message = null;
            while ((message = messageConsumer.receive()) != null){
                TextMessage textMessage = (TextMessage) message;
                System.out.println(textMessage.getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

消费者异步接收消息

public class Consumer {

    //默认连接用户名
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    public static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;     //连接工厂
        Connection connection = null;       //连接
        Session session = null;             //会话
        Destination destination = null;     //消息目的地
        MessageConsumer messageConsumer;    //消息消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
        try {
            //通过连接工作获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建会话,第一个参数表示是否使用事务,第二个参数表示消息的确认模式
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个名为DemoActiveMQ消息队列
            destination = session.createTopic("DemoActiveMQ");
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            //异步接收消息示例
            messageConsumer.setMessageListener(new MessageListener(){
            	@Override
            	public void onMessage(Message message){
            	TextMessage textMessage = (TextMessage) message;      
            	 System.out.println(textMessage.getText());	 
            });
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/306367.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号