栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

消息中间件(异步消息传递)——ActiveMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

消息中间件(异步消息传递)——ActiveMQ

ActiveMQ 一、 ActiveMQ 简介 1 什么是 ActiveMQ

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个
完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久
的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。

2 什么是消息

“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;
也可以更复杂,可能包含嵌入对象。

3 什么是队列

4 什么是消息队列

“消息队列”是在消息的传输过程中保存消息的容器。

5 常用消息服务应用 5.1ActiveMQ

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。

5.2RabbitMQ

RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。他遵循 Mozilla Public License 开源协议。开发语言为 Erlang。

5.3RocketMQ

由阿里巴巴定义开发的一套消息队列应用服务。

二、消息服务的应用场景

消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且**不需要即时(同步)**返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。

5.1异步处理 5.1.1 用户注册

用户注册流程:
1)注册处理以及写数据库
2)发送注册成功的手机短信
3)发送注册成功的邮件信息

如果用消息中间件:则可以创建两个线程来做这些事情,直接发送消息给消息中间件,然后让邮件服务和短信服务自己去消息中间件里面去取消息,然后取到消息后再自己做对应的业务操作。就是这么方便

5.2应用的解耦 5.2.1 订单处理

生成订单流程:
1)在购物车中点击结算
2)完成支付
3)创建订单
4)调用库存系统

订单完成后,订单系统并不去直接调用库存系统,而是发送消息到消息中间件,写入一个订单信息。库存系统自己去消息中间件上去获取,然后做发货处理,并更新库存,这样能够实现互联网型应用追求的快这一个属性。而库存系统读取订单后库存应用这个操作也是非常快的,所以有消息中间件对解耦来说也是一个不错的方向。

5.3流量的削峰 5.3.1 秒杀功能

秒杀流程:
1)用户点击秒杀
2)发送请求到秒杀应用
3)在请求秒杀应用之前将请求放入到消息队列
4)秒杀应用从消息队列中获取请求并处理。

比如,系统举行秒杀活动,热门商品。流量蜂拥而至 100 件商品,10 万人挤进来怎么办?10 万秒杀的操作,放入消息队列。秒杀应用处理消息队列中的 10 万个请求中的前 100个,其他的打回,通知失败。流量峰值控制在消息队列处,秒杀应用不会瞬间被怼死.

三、 JMS 1 什么是 JMS

JMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,它便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口,简化企业应用的开发。

2 JMS 模型 2.1点对点模型(Point To Point)

生产者发送一条消息到 queue,只有一个消费者能收到。

2.2发布订阅模型(Publish/Subscribe)

发布者发送到 topic 的消息,只有订阅了 topic 的订阅者才会收到消息。

四、ActiveMQ 安装 1 下载资源

ActiveMQ 官网: http://activemq.apache.org

1.1版本说明

ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。
ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。

2 上传至 Linux 服务器 3 解压安装文件

tar -zxf apache-activemq-5.9.0-bin.tar.gz

4 检查权限

ls -al apache-activemq-5.9.0/bin
如果权限不足,则无法执行,需要修改文件权限:
chmod 755 activemq

5 复制应用至本地目录

cp -r apache-activemq-5.9.0 /usr/local/activemq -r

6 启动 ActiveMQ

/usr/local/activemq/bin/activemq start

7 测试 ActiveMQ 7.1检查进程

ps aux | grep activemq
见到下述内容即代表启动成功

7.2管理界面

使用浏览器访问 ActiveMQ 管理应用, 地址如下:
http://ip:8161/admin/
用户名: admin
密码: admin
ActiveMQ 使用的是 jetty 提供 HTTP 服务.启动稍慢,建议短暂等待再访问测试.见到如下界面代表服务启动成功
首先关闭防火墙:

如果service iptables stop关闭防火墙,没有成功!
是因为centos7不能关闭防火墙!
所以,这里要使用
systemctl stop firewalld,然后就可以访问了。


7.3修改访问端口

修改 ActiveMQ 配置文件: /usr/local/activemq/conf/jetty.xml

配置文件修改完毕,保存并重新启动 ActiveMQ 服务。

7.4修改用户名和密码

修改 conf/users.properties 配置文件.内容为: 用户名=密码


保存并重启 ActiveMQ 服务即可.

8 重启 ActiveMQ

/usr/local/activemq/bin/activemq restart

9 关闭 ActiveMQ

/usr/local/activemq/bin/activemq stop

10 配置文件 activemq.xml

配置文件中,配置的是 ActiveMQ 的核心配置信息. 是提供服务时使用的配置. 可以修改启动的访问端口. 即 java 编程中访问 ActiveMQ 的访问端口.
默认端口为 61616.
使用协议是: tcp 协议.


修改端口后, 保存并重启 ActiveMQ 服务即可.

11 ActiveMQ 目录介绍


从它的目录来说,还是很简单的:

  • bin 存放的是脚本文件
  • conf 存放的是基本配置文件
  • data 存放的是日志文件
  • docs 存放的是说明文档
  • examples 存放的是简单的实例
  • lib 存放的是 activemq 所需 jar 包
  • webapps 用于存放项目的目录
五、 ActiveMQ 术语 1 Destination

目的地,JMS Provider(消息中间件)负责维护,用于对 Message 进行管理的对象。MessageProducer 需要指定 Destination 才能发送消息,MessageReceiver 需要指定 Destination才能接收消息。

2 Producer

消息生成者,负责发送 Message 到目的地。

3 Consumer | Receiver

消息消费者,负责从目的地中消费【处理|监听|订阅】Message。

4 Message

消息,消息封装一次通信的内容。

六、 ActiveMQ 应用 1 ActiveMQ 常用 API 简介

下述 API 都是接口类型,由定义在 javax.jms 包中. 是 JMS 标准接口定义.

1.1ConnectionFactory

链接工厂, 用于创建链接的工厂类型.

1.2Connection

链接. 用于建立访问 ActiveMQ 连接的类型, 由链接工厂创建.

1.3Session

会话, 一次持久有效有状态的访问. 由链接创建.

1.4Destination & Queue

目的地, 用于描述本次访问 ActiveMQ 的消息访问目的地. 即 ActiveMQ 服务中的具体队列. 由会话创建.interface Queue extends Destination

1.5MessageProducer

消息生成者, 在一次有效会话中, 用于发送消息给ActiveMQ 服务的工具. 由会话创建.

1.6MessageConsumer

消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于从 ActiveMQ 服务中获取消息的工具. 由会话创建.

1.7Message

消息, 通过消息生成者向 ActiveMQ 服务发送消息时使用的数据载体对象或消息消费者从 ActiveMQ 服务中获取消息时使用的数据载体对象. 是所有消息【文本消息,对象消息等】具体类型的顶级接口. 可以通过会话创建或通过会话从 ActiveMQ 服务中获取.

2 JMS-HelloWorld 2.1处理文本消息 2.1.1 创建消息生产者 2.1.1.1 创建工程

2.1.1.2 修改 POM 文件添加 ActiveMQ 坐标




    4.0.0

    com.bjsxt
    mq-producer
    1.0-SNAPSHOT

    
        
        
            org.apache.activemq
            activemq-all
            5.9.0
        
    
    

2.1.1.3 编写消息的生产者


package com.bjsxt;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.stomp.JmsframeTranslator;
import org.apache.activemq.transport.stomp.Stomp;

import javax.jms.*;

public class HelloWorldProducer {
    
    public void sendHelloWorldActiveMQ(String msgTest){

        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //定义消息目的地
        Destination destination = null;

        //定义消息的发送者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接
            connection.start();

            

            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
            destination = session.createQueue("helloworld-destination");

            //创建消息的生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createTextMessage(msgTest);

            //发送消息
            producer.send(message);

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //回收消息发送者资源
            if (producer != null) {
                try {
                    producer.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (connection == null) {
                try {
                    connection.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
        
    }
}
2.1.2 创建消息消费者 2.1.2.1 创建工程

2.1.2.2 修改 POM 文件添加 ActiveMQ 坐标


    4.0.0

    org.example
    mq-consumer
    1.0-SNAPSHOT
    
        
        
            org.apache.activemq
            activemq-all
            5.9.0
        
    


2.1.2.3 编写消息的消费者
package com.bjsxt;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class HelloWorldConsumer {

    
    public void readHelloWorldActiveMQ(){

        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //定义消息目的地
        Destination destination = null;

        //定义消息的消费者
        MessageConsumer consumer = null;

        //定义消息
        Message message = null;

        try {
            
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接
            connection.start();

            

            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
            destination = session.createQueue("helloworld-destination");

            //创建消息的消费者
            consumer = session.createConsumer(destination);

            //创建消息对象
            message = consumer.receive();
            //处理消息
            String msg = ((TextMessage)message).getText();
            System.out.println("从 ActiveMQ 服务中获取n" +
                    "的文本信息"+msg);

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //回收消息发送者资源
            if (consumer != null) {
                try {
                    consumer.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (connection == null) {
                try {
                    connection.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

    }
}
2.1.3 测试 2.1.3.1 Producer
public class Test {
    public static void main(String[] args) {
        HelloWorldProducer producer = new HelloWorldProducer();
        producer.sendHelloWorldActiveMQ("HelloWorld");
    }
}


2.1.3.2 Consumer
public class Test {
    public static void main(String[] args) {
        HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.readHelloWorldActiveMQ();
    }
}


2.2处理对象消息 2.2.1 定义消息对象
public class Users implements Serializable {
    private int userid;
    private String username;
    private int userage;

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public int getUserage() {
        return userage;
    }

    public void setUserage(int userage) {
        this.userage = userage;
    }

    @Override
    public String toString() {
        return "Users{" +
                "userid=" + userid +
                ", username='" + username + ''' +
                ", userage=" + userage +
                '}';
    }
}
2.2.2 创建生产者
public class HelloWorldProducer2 {
    
    public void  sendHelloWorldActiveMQ(Users users){

        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //定义消息的发送者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接
            connection.start();

            

            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
            destination = session.createQueue("my-users");

            //创建消息的生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createObjectMessage(users);

            //发送消息
            producer.send(message);

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //回收消息发送者资源
            if (producer != null) {
                try {
                    producer.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (connection == null) {
                try {
                    connection.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}
2.2.3 定义消息消费者
public class HelloWorldConsumer2 {

    
    public void readHelloWorldActiveMQ(){

        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //定义消息目的地
        Destination destination = null;

        //定义消息的消费者
        MessageConsumer consumer = null;

        //定义消息
        Message message = null;

        try {
            
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();


            //启动连接
            connection.start();

            

            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
            destination = session.createQueue("my-users");

            //创建消息的消费者
            consumer = session.createConsumer(destination);

            //创建消息对象
            message = consumer.receive();
            //处理消息
            ObjectMessage objMessage = (ObjectMessage)message;
            Users users = (Users)objMessage.getObject();
            System.out.println(users);
            
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //回收消息发送者资源
            if (consumer != null) {
                try {
                    consumer.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (connection == null) {
                try {
                    connection.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

    }
}

producer,Test

public class Test {
    public static void main(String[] args) {
       
        Users users = new Users();
        users.setUserid(1);
        users.setUserage(20);
        users.setUsername("zhangsan");
        HelloWorldProducer2 producer2 = new HelloWorldProducer2();
        producer2.sendHelloWorldActiveMQ(users);
    }
}


Consumer,Test

public class Test {
    public static void main(String[] args) {
       
        HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
        consumer2.readHelloWorldActiveMQ();
    }
}


3 JMS - 实现队列服务监听

队列服务监听使用的观察者设计模式

3.1创建消息生产者
public class HelloWorldProducer3 {
    
    public void  sendHelloWorldActiveMQ(Users users){

        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //定义消息的发送者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();


            //启动连接
            connection.start();

            

            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
            destination = session.createQueue("my-destination");

            //创建消息的生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createTextMessage(msgTest);

            //发送消息
            producer.send(message);

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //回收消息发送者资源
            if (producer != null) {
                try {
                    producer.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (connection == null) {
                try {
                    connection.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}

3.2消息消费者
public class HelloWorldConsumer3 {

    
    public void readHelloWorldActiveMQ(){

        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //定义消息目的地
        Destination destination = null;

        //定义消息的消费者
        MessageConsumer consumer = null;

        //定义消息
        Message message = null;

        try {
            
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();


            //启动连接
            connection.start();

            

            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
            destination = session.createQueue("my-destination");

            //创建消息的消费者
            consumer = session.createConsumer(destination);

           consumer.setMessageListener(new MessageListener() {

               //ActiveMQ 回调的方法。通过该方法将消息传递到 consumer
               @Override
               public void onMessage(Message message) {

                   //处理消息
                   String msg = null;
                   try {
                       msg = ((TextMessage)message).getText();
                   } catch (JMSException e) {
                       e.printStackTrace();
                   }
                   System.out.println("从 ActiveMQ 服务中获取n" +
                           "的文本信息"+msg);
               }
           });

        }catch (Exception e){
            e.printStackTrace();
        
        }
    }
}

consumer,Test

public class Test {
    public static void main(String[] args) {
       

        


        HelloWorldConsumer3 consumer3 = new HelloWorldConsumer3();
        consumer3.readHelloWorldActiveMQ();
    }
}

Producer,Test

public class Test {
    public static void main(String[] args) {
       

        

        HelloWorldProducer3 producer3 = new HelloWorldProducer3();
        producer3.sendHelloWorldActiveMQ("Hello World");

    }
}


4 Topic 模型 4.1Publish/Subscribe 处理模式(Topic)

消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
当生产者发布消息,不管是否有消费者。都不会保存消息
一定要先有消息的消费者,后有消息的生产者。

4.2创建生产者
public class HelloWorldProducerTopic {
    
    public void sendHelloWorldActiveMQ(String msgTest){

        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //定义消息目的地
        Destination destination = null;

        //定义消息的发送者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接
            connection.start();

            

            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
            destination = session.createTopic("test-topic");

            //创建消息的生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createTextMessage(msgTest);

            //发送消息
            producer.send(message);

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //回收消息发送者资源
            if (producer != null) {
                try {
                    producer.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (connection == null) {
                try {
                    connection.close();
                }catch (JMSException e){
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

    }
}
4.3创建消费者

为了测试效果,这里复制三份

public class HelloWorldConsumerTopic1 implements Runnable{

    
    public void readHelloWorldActiveMQ(){

        //定义链接工厂
        ConnectionFactory connectionFactory = null;

        //定义链接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //定义消息目的地
        Destination destination = null;

        //定义消息的消费者
        MessageConsumer consumer = null;

        //定义消息
        Message message = null;

        try {
            
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.70.151:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();


            //启动连接
            connection.start();

            

            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
            destination = session.createTopic("test-topic");

            //创建消息的消费者
            consumer = session.createConsumer(destination);

           consumer.setMessageListener(new MessageListener() {

               //ActiveMQ 回调的方法。通过该方法将消息传递到 consumer
               @Override
               public void onMessage(Message message) {

                   //处理消息
                   String msg = null;
                   try {
                       msg = ((TextMessage)message).getText();
                   } catch (JMSException e) {
                       e.printStackTrace();
                   }
                   System.out.println("从 ActiveMQ 服务中获取---topic1n" + "的文本信息"+msg);
               }
           });

        }catch (Exception e){
            e.printStackTrace();

        }

    }

    @Override
    public void run() {
        this.readHelloWorldActiveMQ();
    }
}

Consumer,,,,test

public class Test {
    public static void main(String[] args) {
       

        


        

        HelloWorldConsumerTopic1 topic1 = new HelloWorldConsumerTopic1();
        Thread t1 = new Thread(topic1);
        t1.start();

        HelloWorldConsumerTopic2 topic2 = new HelloWorldConsumerTopic2();
        Thread t2 = new Thread(topic2);
        t2.start();

        HelloWorldConsumerTopic3 topic3 = new HelloWorldConsumerTopic3();
        Thread t3 = new Thread(topic3);
        t3.start();

    }
}

Producer,Test

public class Test {
    public static void main(String[] args) {
       

        

       

        HelloWorldProducerTopic topic = new HelloWorldProducerTopic();
        topic.sendHelloWorldActiveMQ("Hello Topic");
    }
}

先运行消费者,再运行生产者

七、 Spring 整合 ActiveMQ 1 创建 spring-activemq-producer 1.1修改 POM 文件


	4.0.0
	com.bjsxt
	parent
	0.0.1-SNAPSHOT
	pom

	
	
		5.9.0
		4.5
		4.1.6.RELEASE
		5.9.0
		4.10.3
		2.9.0
		4.12
		4.1.3.RELEASE
		3.2.8
		1.2.2
		5.1.32
		1.6.4
		1.0.9
		1.2
		2.5
		2.2
		2.0
		0.10
		2.5.4
		2.4.2
		3.3
		1.3.1
	


	
	
		
		
		
			org.apache.activemq
			activemq-all
			${activemq.version}
		
		
		
			org.apache.xbean
			xbean-spring
			${xbean.version}
		
		
		
			org.springframework
			spring-jms
			${jms.version}
		
		
		
		    org.apache.activemq
		    activemq-pool
		    ${activemq-pool.version}
		
		
		    org.apache.activemq
		    activemq-jms-pool
		    ${activemq-pool.version}
		
		
		
			org.apache.solr
			solr-solrj
			${solrj.version}
		
		
			redis.clients
			jedis
			${jedis.version}
		
			
			
				junit
				junit
				${junit.version}
			
			
			
				org.slf4j
				slf4j-log4j12
				${slf4j.version}
			
			
			
				org.mybatis
				mybatis
				${mybatis.version}
			
			
				org.mybatis
				mybatis-spring
				${mybatis.spring.version}
			
			
			
				mysql
				mysql-connector-java
				${mysql.version}
			
			
			
				com.alibaba
				druid
				${druid.version}
			
			
			
				org.springframework
				spring-context
				${spring.version}
			
			
				org.springframework
				spring-beans
				${spring.version}
			
			
				org.springframework
				spring-webmvc
				${spring.version}
			
			
				org.springframework
				spring-jdbc
				${spring.version}
			
			
				org.springframework
				spring-aspects
				${spring.version}
			
			
			
				jstl
				jstl
				${jstl.version}
			
			
				javax.servlet
				servlet-api
				${servlet-api.version}
				provided
			
			
				javax.servlet
				jsp-api
				${jsp-api.version}
				provided
			
			
			
				commons-fileupload
				commons-fileupload
				${commons-fileupload.version}
			
			
			
				commons-net
				commons-net
				${commons-net.version}
			
			
			
			
				com.fasterxml.jackson.core
				jackson-databind
				${jackson.version}
			
		
	

	
		
			
				src/main/java
				
					***.xml
					**
@Component(value="myListener")
public class MyMessageListener implements MessageListener{

	@Autowired
	private UserService userService;
	
	@Override
	public void onMessage(Message message) {
		//处理消息
		ObjectMessage objMessage = (ObjectMessage)message;
		Users user=null;
		try {
			user = (Users)objMessage.getObject();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		this.userService.showUser(user);
	}
}

练习项目源码:https://gitee.com/cutelili/active-mq/tree/master/

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327826.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号