栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot整合ActiveMQ

SpringBoot整合ActiveMQ

1.加入依赖

 


    org.springframework.boot
    spring-boot-starter-activemq



    org.springframework.boot
    spring-boot-starter-web



    org.springframework.boot
    spring-boot-configuration-processor
    true



    org.projectlombok
    lombok
    1.18.4

2.增加配置类

package com.example.mq.config.properties;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "spring.activemq")
@Data
public class ActiveMqConfigProperties {
    @Value("user")
    private String user;
    @Value("password")
    private String password;
    @Value("broker-url")
    private String brokerUrl;
}

3.配置MQ连接工厂和MQ消费者监听器工厂

package com.example.mq.config;

import com.example.mq.config.properties.ActiveMqConfigProperties;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

@Configuration
public class ActiveMqConfig {
    
@Autowired
ActiveMqConfigProperties activeMqConfigProperties;

//创建会话工厂
@Bean
public ConnectionFactory connectionFactory(){
    return new ActiveMQConnectionFactory(activeMqConfigProperties.getUser(),activeMqConfigProperties.getPassword(),activeMqConfigProperties.getBrokerUrl());
}

//创建消息模板类(提供消息发送和接收方法)
@Bean
public JmsTemplate jmsTemplate(){
    JmsTemplate jmsTemplate = new JmsTemplate();
    jmsTemplate.setConnectionFactory(connectionFactory());
    return jmsTemplate;
}

//创建消息监听器-点对点方式
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactoryQueue(ActiveMQConnectionFactory activeMQConnectionFactory){
    DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
    defaultJmsListenerContainerFactory.setConnectionFactory(activeMQConnectionFactory);

    return defaultJmsListenerContainerFactory;
}

//创建消息监听器-发布/订阅方式
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactoryTopic(ActiveMQConnectionFactory activeMQConnectionFactory){
    DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
    defaultJmsListenerContainerFactory.setConnectionFactory(activeMQConnectionFactory);
    defaultJmsListenerContainerFactory.setPubSubDomain(true);//设置发布/订阅方式,默认是点对点方式

    return defaultJmsListenerContainerFactory;
}
}

4.编写生产者代码

package com.example.mq.controller;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.Queue;
import javax.jms.Topic;

@RestController("producer")
public class ProducerController {
    @Autowired
    JmsTemplate jmsTemplate;

    @GetMapping("/sendByQueue")
    public String send(){
        //创建点对点队列
        Queue queue = new ActiveMQQueue("testSendQueueMsg");
        Destination destination = new ActiveMQQueue("testSendQueueMsg");

        for(int i=1;i<=10;i++){
            int finalI = i;
            jmsTemplate.send(destination, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("生产者发送消息(点对点模式)" + finalI);
                }
            });
        }

        return "点对点消息发送成功";
    }

    @GetMapping("/sendByTopic")
    public String sendTopic(){
        //创建发布/订阅队列
        Destination destination = new ActiveMQTopic("testSendTopicMsg");

        for(int i=1;i<=10;i++){
            int finalI = i;
            jmsTemplate.send(destination, new MessageCreator() {
                    @Override
                    public Message createMessage(Session session) throws JMSException {
                            return session.createTextMessage("生产者发送消息(订阅模式)" + finalI);
                    }
               });
        }

        return "发布/订阅消息发送成功";
    }
}

5.编写消费者监听类

package com.example.mq.listener;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class ActiveMQListener {

    @JmsListener(destination = "testSendQueueMsg",containerFactory = "jmsListenerContainerFactoryQueue")
    public void receiverQueue(String text){
        System.out.println("点到点收到的消息为=" + text);
    }

    @JmsListener(destination = "testSendTopicMsg",containerFactory = "jmsListenerContainerFactoryTopic")
    public void receiverTopic(String text){
        System.out.println("发布/订阅收到的消息为=" + text);
    }
}

6.点对点方式执行结果

7.发布/订阅方式执行效果

 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584351.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号