栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Springboot 整合 RabbitMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Springboot 整合 RabbitMQ

概念解释:

Broker: 消息队列服务器实体生产者(Producer):发送消息的应用消费者(Consumer):接收消息的应用队列(Queue):保存消息并将它们转发给消费者消息(Message):服务与应用程序之间传送的数据,由消息头和消息体组成。消息体是不透明的,消息头由一些列可选属性组成,这些属性包括:routing-key(路由键)、priority(优先级)、delivery-mode(消息是否可持久性存储)连接(Connection):连接RabbitMQ和应用服务器的TCP连接通道(Channel):网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务交换机(Exchange):从生产者那里接收消息,并根据交换类型分发到对应的消息列队里,要实现消息的接收,一个队列必须绑定一个交换机,不具备消息存储的能力绑定(Binding):绑定是队列和交换机的一个链接。路由键(Routing Key):路由键是供交换机查看并根据键的值来决定如何分发消息到列队的一个键。用户(Users):在RabbitMQ里,是可以通过指定的用户名和密码来进行连接的。每个用户可以分配不同的权限,例如读权限,写权限以及在实例里进行配置的权限虚拟主机(Virtual Host):用于进行逻辑隔离,最上层的消息路由,一个虚拟主机可以有若干个Exchange和Queue,同一个虚拟主机里不能有相同名字的Exchange

工作模式:

简单模式(Simple ):P - > Q ->C,单对单。工作队列模式(Work):多个消费端,消费同一个队列中的消息,采用轮询分发或公平分发
–轮询分发:服务器的处理能力没有影响,一人一条按均分配
–公平分发:根据消费者能力进行分发,多劳多得发布订阅 (Fanout ):给绑定得所有Queue发送消息路由模式 (Direct):发布订阅模式+Routing Key ,给指定路由的key发送消息主题模式(topic) :模糊匹配路由Key
– # :0个或者多个,多级(.xx.xx.xx),如 #.com 匹配,com、aa.com、aa.bb.com
– * : 至少>1 满足,必须要有一级(.xx) 如 *.com 匹配,a.com根据参数条件匹配(header模式):取消routingkey,使用header中的 key/value(键值对)匹配队列
–RPC:消息主动拉取publish /confirm/i: 发布确认模式

消息确认机制 ack:

默认情况为自动应答,收到消息时,返回给 mq 确认收到ack! 当接口报错时,还是会自动应该,然后退回mq,mq充重试然后一个循环报错…一般生成环境建议手动ack,当消息正常消费时,返回确认。

整合开发


    org.springframework.boot
    spring-boot-starter-amqp

spring:

  rabbitmq:
    host: 192.168.1.100
    username: guest
    password: guest
    virtual-host: /
    publisher-/confirm/i-type: correlated  #手动应答,自动ack
    publisher-returns: false #失败退回
    #开启手动ACK
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual

配置交换机、队列、绑定

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMqConfig {

    
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_order_exchange",true,false);
    }

    
    @Bean
    public Queue orderQueue(){
        return new Queue("order.fanout.queue");
    }

    
    @Bean
    public Binding orderBinding(){
        return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
    }
    
}

消息发送:

消息投递成功确认rabbitTemplate.setConfirmCallback(correlationData, ack, cause) -> {...}

@Service("orderServerImpl")
public class OrderServerImpl implements OrderService {

    @Resource
    RabbitTemplate rabbitTemplate;

    
    @Override
    public void makerOrder(String userName, String productId, int number) {
        String orderId= UUID.randomUUID().toString();
        System.out.println("订单生产成功");
        String exchangeName="fanout_order_exchange";
        String routingKey="";
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(orderId);
        rabbitTemplate.convertAndSend(exchangeName,routingKey,msg,correlationData);
    }
    
    
	@Override
    public void makerOrderDirect(String userName, String productId, int number) {
        String orderId= UUID.randomUUID().toString();
        System.out.println("订单生产成功-路由模式:key-order");
        String exchangeName="direct_order_exchange";
        String routingKey="order";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
    }
    
    
    @Override
    public void ttlMessageMakerOrderDirect(String userName, String productId, int number) {
        String orderId= UUID.randomUUID().toString();
        System.out.println("订单生产成功-路由模式-单消息超时设置:key-order");
        String exchangeName="ttl_direct_exchange";
        String routingKey="ttl-message";
        //设置消息过去时间
        MessagePostProcessor messagePostProcessor = message -> {
            message.getMessageProperties().setExpiration("5000");
            message.getMessageProperties().setContentEncoding("UTF-8");
            return message;
        };
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor);
    }
     
    @PostConstruct
    public void regCallBack(){
        rabbitTemplate.set/confirm/iCallback((correlationData, ack, cause) -> {
            System.out.println("cause:"+cause);
            String orderId = correlationData.getId();
            //ack 为true代表确认消息投递到MQ
            if (!ack){
                System.out.println("MQ队列应答--失败,OrderId:"+orderId);
                return;
            }
            try {
                System.out.println("MQ队列应答--成功, 修改订单冗余状态,OrderId:"+orderId);
            }catch (Exception ex){
                System.out.println("本地消息出现异常"+ex.getMessage());
            }
        });
    }
}

消息接收:一般在另一服务

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;


@RabbitListener(queues = {"order.fanout.queue"})
public interface OrderConsumer {

    @RabbitHandler
    void orderMessageHandler(String meeage, Channel channel, Message message);

}

消息消费确认

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

import com.rabbitmq.client.Channel;
import com.service.OrderConsumer;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Map;

@Service("orderConsumer")
public class OrderConsumerImpl implements OrderConsumer {

    @Override
    public void orderMessageHandler(String msg, Channel channel, Message message) {
        try {
            //确认消息处理,返回ack
            System.out.println("消息内容:"+msg);
            Map headers = message.getMessageProperties().getHeaders();
            Object correlation = headers.get("spring_returned_message_correlation");
            System.out.println("附属id:"+correlation);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            //处理异常,拒绝确认消息
            try {
                System.out.println("处理失败,拒绝消息");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),
                        false, false);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            System.err.println("get msg1 failed msg = "+msg);
            e.printStackTrace();
        }
    }
}

死信交换机(Dead-Letter-exchange):当一个消息在队列中变成一个死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是死信交换机,与之绑定的队列称为死信队列。

消息被拒绝消息过期队列达到最大长度》当达到这3种条件时,消息就会转移到死信交换机 - > 死信队列 - > 死信消费者处理消息

@Configuration
public class DeadRabbitMqConfig {

    
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange("dead_direct_exchange",true,false);
    }

    
    @Bean
    public Queue deadQueue(){
        return new Queue("dead.direct.queue",true);
    }


    
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }

}
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;


@Configuration
public class TTLRabbitMqConfig {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl_direct_exchange",true,false);
    }

    //队列设置过期时间 x-message-ttl
    //exclusive 排他性
    //设置整个Queue里消息得过去时间ms
    //如果队列已经创建,更新参数,需要删除后重新创建; 或者重新创建一个新的,存入新的,注意数据的转移!
    @Bean
    public Queue ttlDirectOrderQueue(){
        Map args = new HashMap<>();
        args.put("x-message-ttl",10000);//过期时间int 类型
        //绑定死信交换机
        args.put("x-max-length",5);//最大长度5条,超过进入死信队列
        args.put("x-dead-letter-exchange","dead_direct_exchange");//指定死信交换机
        args.put("x-dead-letter-routing-key","dead");//fanout 不用配置
        return new Queue("ttl_direct.queue",true,false,false,args);
    }

    @Bean
    public Queue ttlDirectMessageOrderQueue(){
        return new Queue("ttl_message_direct.queue",true);
    }

    @Bean
    public Binding ttlDirectOrderBinding(){
        return BindingBuilder.bind(ttlDirectOrderQueue()).to(ttlDirectExchange()).with("ttl");
    }

    @Bean
    public Binding ttlDirectMessageOrderBinding(){
        return BindingBuilder.bind(ttlDirectMessageOrderQueue()).to(ttlDirectExchange()).with("ttl-message");
    }
}

应用:如 延迟队列,订单未超时未支付,将订单放到死信队列中,后面再对此订单做处理。

内存磁盘监控:

注意程序的死循环内存磁盘满时,(内存)消息的发送,(磁盘)消息无法持久化内存告警时,链接会被 blocking 挂起.
配置文件方式修改(需要重启):(建议:0.4~0.7)内存相对值:vm_memory_high_watermark.relative = 0.6内存绝对值:vm_memory_high_watermark.absolute = 2GB命令行方式修改:内存相对值:rabbitmqctl set_vm_memory_high_watermark 0.6内存绝对值:rabbitmqctl set_vm_memory_high_watermark absolute 2GB


磁盘空间修改:rabbitmqctl set_disk_free_limit disk_limit:固定单位 KB MB GBrabbitmqctl set_disk_free_limit memory_limit fraction:是相对阈值,建议范围在:1.0~2.0之间。(相对于内存)disk_free_limit.relative = 3.0disk_free_limit.absolute = 50MB (小于50M会爆红)

内存换页概念:

在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。(磁盘空间换内存空间)默认情况内存的阀值是0.4时,当内存超过0.4*0.5=0.2时,就会进行换页动作。如:1000MB -》 400MB -》 200MB转移到磁盘中

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/769348.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号