栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ安装启动以及与springboot整合

RabbitMQ安装启动以及与springboot整合

第一步安装erlang环境:

下载:
由于rabbitmq是erlang来写的,所以需要安装erlang环境:
https://www.erlang-solutions.com/resources/download.html
安装:

yum -y install esl-erlang_23.0.2-1_centos_7_amd64.rpm

检测erlang:

erl
第二步安装RabbitMQ

下载:

http://www.rabbitmq.com/download.html

安装rabbitmq:

yum -y install rabbitmq-server-3.8.5-1.el7.noarch.rpm
查看rabbitmq的插件:
rabbitmq-plugins list
使用命令安装rabbitmq管理插件:rabbitmq-plugins enable rabbitma_management

启动rabbitmq:systemctl start rabbitmq-server.service
查看rabbitmq状态:systemctl status rabbitmq-server.service

访问ip:15672
用户名密码默认guest

出现警告:User can only log in via localhost
解决方法:
cd /etc/rabbitmq/
创建: vim rabbitmq.config

添加:
[{rabbit,[{loopback_users,[]}]}].


重启rabbitmq:systemctl restart rabbitmq-server.service

记得开放服务器防火墙和安全组的端口号!!!
管控台插件页面:

第三步整合springboot

创建springboot项目添加依赖:


        
            org.springframework.boot
            spring-boot-starter-amqp
        

application.yml添加配置:

#RabbitMq
  rabbitmq:
    #服务器
    host: *.*.*.*
    #用户名
    username: guest
    #密码
    password: guest
    #虚拟主机
    virtual-host: /
    #端口
    port: 5672
    listener:
      simple:
        #消费者最小数量
        concurrency: 10
        #消费者最大数量
        max-concurrency: 10
        #限制消费者每次只处理一条消息,处理完再继续下一条消息
        prefetch: 1
        #启动时是否默认启动容器,默认true
        auto-startup: true
        #被拒绝时重新进入队列
        default-requeue-rejected: true
    template:
      retry:
        #发布重试,默认false
        enabled: true
        #重试时间,默认1000ms
        initial-interval: 1000ms
        #重试最大次数,默认3次
        max-attempts: 3
        #重试最大间隔时间
        max-interval: 10000ms
        #重试的间隔乘数 比如2.0  第一次就等10s 第二次就等20s,第三次就等40s
        multiplier: 1
hello入门案例:

创建rabbitmq配置类:RabbitMQConfig

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;




@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue(){
        return new Queue("queue",true);
    }
}

创建生产者:MQSender

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object msg){
        log.info("发送消息"+msg);
        rabbitTemplate.convertAndSend("queue",msg);
    }

}

创建接收者:MQReceiver

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;



@Service
@Slf4j
public class MQReceiver {
    @RabbitListener(queues = "queue")
    public void receive(Object msg){
        log.info("接收消息:"+ msg);
    }
}

编写测试接口:

	@RequestMapping("/mq")
	@ResponseBody
	public void mq(){
		mqSender.send("Hello");
	}


在rabbitmq控制台就会有数据显示:

fanout模式

RabbitMQConfig配置类:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;




@Configuration
public class RabbitMQConfig {
    //fanout模式创建队列与交换机
    private static final String Queue01="queue_fanout01";
    private static final String Queue02="queue_fanout02";
    private static final String EXCHANGE="fanoutExchange";

    @Bean
    public Queue queue(){
        return new Queue("queue",true);
    }

    //创建队列与交换机实例
    @Bean
    public Queue queue01(){
        return new Queue(Queue01);
    }
    @Bean
    public Queue queue02(){
        return new Queue(Queue02);
    }
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE);
    }

    //交换机与队列进行绑定
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue01()).to(fanoutExchange());
    }

    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue02()).to(fanoutExchange());
    }
}

生产者MQSender 生产者向交换机发送消息:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object msg){
        log.info("发送消息"+msg);
        rabbitTemplate.convertAndSend("queue",msg);
    }

    public void send02(Object msg){//使用交换机向队列推送消息
        log.info("发送消息"+msg);
        rabbitTemplate.convertAndSend("fanoutExchange","",msg);
    }

}

接收者MQReceiver :


@Service
@Slf4j
public class MQReceiver {
    @RabbitListener(queues = "queue")
    public void receive(Object msg){
        log.info("接收消息:"+ msg);
    }

    //获取队列中的消息
    @RabbitListener(queues = "queue_fanout01")
    public void receive01(Object msg){
        log.info("QUEUE01接收消息:"+msg);
    }

    //获取队列中的消息
    @RabbitListener(queues = "queue_fanout02")
    public void receive02(Object msg){
        log.info("QUEUE接收消息:"+msg);
    }
}

测试:

	 @RequestMapping("/mq/fanout")
	 @ResponseBody
	 public void mq01() {
	 	mqSender.send02("Hello");
	 }

交换机:

队列:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/741869.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号