栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java > SpringBoot

spring整合activemq

SpringBoot 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

spring整合activemq

1.linux安装activemq

本例使用docker pull的activemq的镜像,并没有安装,
安装完成之后通过8161端口访问,输入用户名密码(admin),即可访问activemq的管理界面


image.png

2.新建一个maven项目

这是一个ssm项目。pom如下


  4.0.0
  jk.zmn
  spring-activemq
  war
  0.0.1-SNAPSHOT
  spring-activemq Maven Webapp
  http://maven.apache.org
  
    
    4.0.5.RELEASE
    3.2.1
    1.6.6
    1.2.12
    5.1.35
    2.8.8
    5.11.2
  
  
  
    
        org.springframework
        spring-core
        ${spring.version}
    
    
        org.springframework
        spring-context
        ${spring.version}
    
    
        org.springframework
        spring-context-support
        ${spring.version}
    
    
        org.springframework
        spring-aop
        ${spring.version}
    
    
        org.springframework
        spring-aspects
        ${spring.version}
    
    
        org.springframework
        spring-tx
        ${spring.version}
    
    
        org.springframework
        spring-jdbc
        ${spring.version}
    
    
        org.springframework
        spring-web
        ${spring.version}
    
    
        
            org.springframework
            spring-test
            ${spring.version}
            test
        
 
  
        
            org.springframework
            spring-webmvc
            ${spring.version}
        
        
            org.springframework
            spring-web
            ${spring.version}
        
  
  
    
        mysql
        mysql-connector-java
        ${mysql.version}
    
    
    
     
         com.alibaba
         druid
         0.2.23
     
     
    
     
  
        log4j
        log4j
        ${log4j.version}
    
    
        org.slf4j
        slf4j-api
        ${slf4j.version}
    
    
        ch.qos.logback
        logback-classic
        1.1.2
    
    
        ch.qos.logback
        logback-core
        1.1.2
    
    
        org.logback-extensions
        logback-ext-spring
        0.1.1
    

    
     
    
    
        org.mybatis
        mybatis
        ${mybatis.version}
    

    
    
        org.mybatis
        mybatis-spring
        1.2.0
    
  
          
              javax.servlet
              javax.servlet-api
              3.0.1
          
          
              javax.servlet.jsp
              javax.servlet.jsp-api
              2.3.2-b01
          
          
          
              javax.servlet
              jstl
              1.2
          
    
    
      junit
      junit
      3.8.1
    
    
    
         com.github.pagehelper
         pagehelper
         4.1.4
     

    
      com.fasterxml.jackson.core
       jackson-core
       ${jackjson.version}
    
    
       com.fasterxml.jackson.core
       jackson-annotations
       ${jackjson.version}
    
    
       com.fasterxml.jackson.core
       jackson-databind
       ${jackjson.version}
    
    
    
        org.apache.activemq
        activemq-all
        ${activemq.version}
    
    
        org.springframework
        spring-jms
        ${spring.version}
    
    
        org.springframework
        spring-context-support
        ${spring.version}
    
  
  
  
  
  
    spring-activemq
  
1.非整合spring的单机版1.queue形式

内容提供者

    @Test
    public void testQueueMqProducter() throws Exception{        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");        //2.创建链接
        Connection connection = factory.createConnection();        //3.开启连接
        connection.start();        //4.创建一个session对象
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5.创建一个目的地
        Queue testQueue = session.createQueue("testQueue");        //6.创建一个内容提供者
        MessageProducer producter = session.createProducer(testQueue);        //7.发送消息
        TextMessage QMessage = new ActiveMQTextMessage();
        QMessage.setText("我是队列信息,要减库存了");
        producter.send(QMessage);        //8.关闭资源
        session.close();
        connection.close();
        
        
    }

消费者

    @Test
    public void testQueueMqConsumer() throws Exception{        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");        //2.创建链接
        Connection connection = factory.createConnection();        //3.开启连接
        connection.start();        //4.创建一个session对象
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5.创建一个目的地
        Queue testQueue = session.createQueue("testQueue");        //6.创建一个消费者
         MessageConsumer consumer = session.createConsumer(testQueue);        //7.发送消息
         TextMessage receive = (TextMessage) consumer.receive();
         System.out.println(receive.getText());        //8.关闭资源
        session.close();
        connection.close();
        
        
    }

效果如下

image.png

2. 发布订阅模式

发布者

@Test
    public void testTopicMqProducter() throws Exception{        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");        //2.创建链接
        Connection connection = factory.createConnection();        //3.开启连接
        connection.start();        //4.创建一个session对象
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5.创建一个目的地
        Topic createTopic = session.createTopic("testTopic");        //6.创建一个内容提供者
        MessageProducer producter = session.createProducer(createTopic);        //7.发送消息
        TextMessage QMessage = new ActiveMQTextMessage();
        QMessage.setText("我是发布信息,张三抢到了手机11,rsad");
        producter.send(QMessage);        //8.关闭资源
        session.close();
        connection.close();
        
        
    }

订阅者

@Test
    public void testTopicMqConsumer() throws Exception{        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");        //2.创建链接
        Connection connection = factory.createConnection();        //3.开启连接
        connection.start();        //4.创建一个session对象
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5.创建一个目的地
        Topic createTopic = session.createTopic("testTopic");        //6.创建一个内容提供者
        MessageConsumer consumer = session.createConsumer(createTopic);        
        //获取数据
        consumer.setMessageListener(new MessageListener() {            
            @Override
            public void onMessage(Message message) {
                TextMessage message2 = (TextMessage) message;                try {
                    System.out.println(message2.getText());
                } catch (JMSException e) {                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        System.out.println("消费者1启动");
        System.in.read();        //8.关闭资源
        session.close();
        connection.close();
        
        
    }    
    @Test
    public void testTopicMqConsumer2() throws Exception{        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");        //2.创建链接
        Connection connection = factory.createConnection();        //3.开启连接
        connection.start();        //4.创建一个session对象
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5.创建一个目的地
        Topic createTopic = session.createTopic("testTopic");        //6.创建一个内容提供者
        MessageConsumer consumer = session.createConsumer(createTopic);        
        //获取数据
        consumer.setMessageListener(new MessageListener() {            
            @Override
            public void onMessage(Message message) {
                TextMessage message2 = (TextMessage) message;                try {
                    System.out.println(message2.getText());
                } catch (JMSException e) {                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        System.out.println("消费者2启动");
        System.in.read();        //8.关闭资源
        session.close();
        connection.close();
        
        
    }

效果如下


image.png

2 整合spring

spring-activemq.xml


    
    
    
        
    
    
    
        
        
    
    
    
    
    
        
        
    
    
    
    
    
        
            spring-queue
        
    
    
    
        
    
    
    
    
    
        
        
        
    
    
    
    
    
    
        
        
        
    
    
    
    
    
    
        
        
        
    
    
    
    
queue模式

内容提供者

@Test
    public void testSpringActiveMqProducter() {
        ApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
        JmsTemplate jmsTemplate = classPathXmlApplicationContext.getBean(JmsTemplate.class);
        Destination destination = (Destination) classPathXmlApplicationContext.getBean("queueDestination");
        jmsTemplate.send(destination,new MessageCreator() {            
            @Override
            public Message createMessage(Session session) throws JMSException {                
                return session.createTextMessage("生意来了,张三购买商品");
            }
        });
    }

这个消费者,要实现messagelistener的接口

public class MyMessageListener implements MessageListener{    @Override
    public void onMessage(Message message) {
        TextMessage message2 = (TextMessage) message;        try {
            System.out.println(message2.getText());
        } catch (JMSException e) {            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
发布订阅模式

发布者

@Test
    public void testSpringActiveMqTopicProducter() {
        ApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
        JmsTemplate jmsTemplate = classPathXmlApplicationContext.getBean(JmsTemplate.class);
        Destination destination = (Destination) classPathXmlApplicationContext.getBean("topicDestination");
        jmsTemplate.send(destination, new MessageCreator() {            
            @Override
            public Message createMessage(Session session) throws JMSException {                
                return session.createTextMessage("又来生意啦,李四要购买手机");
            }
        });
    }

订阅者

public class MyTopicMessageListener implements MessageListener{    @Override
    public void onMessage(Message message) {        try {
            TextMessage message2 = (TextMessage) message;
            System.out.println(message2.getText());
            System.out.println("我去减库存");
        } catch (JMSException e) {            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
public class MyTopicMessageListener2 implements MessageListener{    @Override
    public void onMessage(Message message) {        try {
            TextMessage message2 = (TextMessage) message;
            System.out.println(message2.getText());
            System.out.println("我去生成订单");
        } catch (JMSException e) {            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }

}

代码太乱,实验的话,请到码云下载,



作者:z七夜
链接:https://www.jianshu.com/p/06e306926c89

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/235685.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号