栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot2.x RabbitMQ Nacos Nacos-Config

SpringBoot2.x RabbitMQ Nacos Nacos-Config


文章目录
          • 一、依赖配置
            • 1. 引入依赖
            • 2. 配置文件
            • 3. 主配置
          • 二、生产者代码代码Conding
            • 2.1. 发送客户端
            • 2.2. 确认机制
            • 2.3. 消息 return机制
            • 2.4. controller
            • 2.5. MQ工具类
            • 2.6. 常量类
          • 三、消费端
            • 3.2. 消费者代码
            • 3.2. RabbitMQ常用命令

一、依赖配置 1. 引入依赖
        
        
            com.alibaba.cloud
            spring-cloud-starter-alibaba-nacos-discovery
        

        
        
            com.alibaba.cloud
            spring-cloud-starter-alibaba-nacos-config
        
    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
2. 配置文件

项目内部配置bootstrap.yml,

server:
  port: 8001
spring:
  application:
    # 应用名称
    name: ly-rabbitmq
  profiles:
    # 环境配置
    active: dev
  cloud:
    nacos:
      discovery:
        # 服务注册地址
        server-addr: nacos.server.com:8848
      config:
        # 配置中心地址
        server-addr: nacos.server.com:8848
        # 配置文件格式
        file-extension: yml
        # 共享配置
        shared-configs:
          - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

nacos-config服务端配置

在这里插入代码片
3. 主配置
package com.gblfy.lyrabbitmq.config;

import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitTopicConfig {

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(MQPrefixConst.WS_EXCEHANGE, true, false);
    }

    @Bean
    Queue hisUQ() {
        return new Queue("ly_mq_fai_q");
    }

    @Bean
    Binding hisUQBinding() {
        //ly-his.# 代表路由规则 表示如果路由的 routingKey 是以ly-his 开头就会发送到 ly_mq_his_u_q 这个队列上
        return BindingBuilder.bind(hisUQ()).to(topicExchange()).with("ly-fai.#");
    }
}

二、生产者代码代码Conding 2.1. 发送客户端
package com.gblfy.lyrabbitmq.provider;

import com.alibaba.fastjson.JSON;
import com.gblfy.common.entity.Order;
import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import com.gblfy.lyrabbitmq.utils.MQSendMsgUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;


@Service
public class RabbitMQProvider {
    private final static Logger log = LoggerFactory.getLogger(RabbitMQProvider.class);

    @Autowired
    private MQSendMsgUtils mqSendMsgUtils;

    
    public void sendMQContent(long orderId, String orderNum, LocalDateTime createTime) {
        Order order = Order.builder()
                .orderId(orderId)
                .orderNum(orderNum)
                .createTime(createTime)
                .build();
        // 发送MQ消息到交换机通过指定消息路由key路由到指定队列中
        mqSendMsgUtils.snedStrMQMsg(MQPrefixConst.LY_MQ_FAI_QUERY, JSON.toJSONString(order));
        log.info("MQ消息发送成功");
    }
}

2.2. 确认机制
package com.gblfy.lyrabbitmq./confirm/is;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;


@Component("/confirm/iCallback")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {

    //日志输出
    private final static Logger log = LoggerFactory.getLogger(/confirm/iCallBackListener.class);

    
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {

        log.info("消息队列标识ID: {}", correlationData.getId());

        
        if (ack) {
            log.info("发送消息成功: {}", ack);
        } else {
            log.info("发送消息失败: {}", ack);
        }
    }
}

2.3. 消息 return机制
package com.gblfy.lyrabbitmq.returns;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;


@Component("returnCallBackListener")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {

    //打印日志 实时定位
    private final static Logger log = LoggerFactory.getLogger(ReturnCallBackListener.class);

    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

        log.info("DeliveryTag: {}", message.getMessageProperties().getDeliveryTag());
        log.info("ContentType: {}", message.getMessageProperties().getContentType());
        log.info("ContentEncoding: {}", message.getMessageProperties().getContentEncoding());

        log.info("消息发送的指定交换机: {}", exchange);
        log.info("队列路由的routingKey: {}", routingKey);
        log.info("队列的响应码replyCode: {}", replyCode);
        log.info("队列的响应信息: {}", replyText);
        //TODO 消息发送交换机成功 路由失败 保存轨迹记录
    }
}

2.4. controller
package com.gblfy.lyrabbitmq.controller;

import com.gblfy.lyrabbitmq.provider.RabbitMQProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

@RestController
@RequestMapping("/mq")
public class MQProviderController {

    @Autowired
    private RabbitMQProvider mqProvider;

    @GetMapping("/sendMQ")
    public String sendMQContent() {
        mqProvider.sendMQContent(0001, "10", LocalDateTime.now());
        return "OK";
    }
}

2.5. MQ工具类
package com.gblfy.lyrabbitmq.utils;

import com.gblfy.lyrabbitmq./confirm/is./confirm/iCallBackListener;
import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import com.gblfy.lyrabbitmq.returns.ReturnCallBackListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;


@Component
public class MQSendMsgUtils {

    private final static Logger log = LoggerFactory.getLogger(MQSendMsgUtils.class);

    @Autowired//注入发送消息模板
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ConfirmCallBackListener /confirm/iCallback;
    @Autowired
    private ReturnCallBackListener returnCallback;

    
    public void snedStrMQMsg(String queueRouteKey, String msg) {
        try {
            log.info("交换机名称: {}, 路由routingKey: {}, 发送的消息: {} ", "EXCHANGE-CMIIP", queueRouteKey, msg);
            String mID = UUID.randomUUID().toString();
            CorrelationData correlationId = new CorrelationData(mID);

            // Confirm 消息确认策略
            rabbitTemplate.set/confirm/iCallback(/confirm/iCallback);
            // Return 消息确认策略
            rabbitTemplate.setReturnCallback(returnCallback);
            log.info("发送消息的路由key: {}", queueRouteKey);
            log.info("发送消息的标识ID: {}", mID);

            //发送消息到MQ的交换机,通知其他系统
            rabbitTemplate.convertAndSend(MQPrefixConst.WS_EXCEHANGE, queueRouteKey, msg, correlationId);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
2.6. 常量类
package com.gblfy.lyrabbitmq.consts;


public class MQPrefixConst {

    //交换机名称
    //回归环境
    public static final String WS_EXCEHANGE = "LY-REPORT-EXCHANGE";
    // 路由key
    public static final String LY_MQ_FAI_QUERY = "ly-fai.query";
}


三、消费端 3.2. 消费者代码
package com.gblfy.lyrabbitmq.consumer;

import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;



@Component
public class RabbitMQHandler implements ChannelAwareMessageListener {
    //打印日志 实时定位
    private final static Logger log = LoggerFactory.getLogger(RabbitMQHandler.class);

    
    @Override
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.str1.queue.name}",
                    durable = "${spring.rabbitmq.listener.str1.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.str1.exchange.name}",
                    durable = "${spring.rabbitmq.listener.str1.exchange.durable}",
                    type = "${spring.rabbitmq.listener.str1.exchange.type}",
                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.str1.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.str1.key}"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {

        //TODO 接收消息成功 创建一个消费端轨迹表来存储消息的轨迹数据
        String jsonMsg = new String(message.getBody());
        log.info("响应报文 mResXml: {}", jsonMsg);

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 反馈消息的消费状态
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        // 反馈消息的消费状态System.err.println("--------------------------------------");


        //------------------------------根据约定解析指定的标签--------------------------------------------
        // JSonObject jsonObject = new JSonObject();
        // jsonObject = JSON.parseObject(jsonMsg);

        // String msgID = jsonObject.getString("msgID");
        // log.info("接收的消息ID: {}", msgID);
        //
        // String tResXml = jsonObject.getString("tResXml");
        // log.info("解析后的zip路径: {}", tResXml);

        String queueRouteKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info("接收的路由key: {}", queueRouteKey);

        if (MQPrefixConst.LY_MQ_FAI_QUERY.equals(queueRouteKey)) {
            //TODO  监听查询接口逻辑

            //TODO 保存数据到数据库
        } else {
            log.error("无此路由key: {}", queueRouteKey);
        }
    }
}
3.2. RabbitMQ常用命令
# 启动MQ
rabbitmq-server -detatched
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278404.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号