栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rabbitmq整合springboot(springboot整合rabbitmq配置)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rabbitmq整合springboot(springboot整合rabbitmq配置)

分布式事务:

指事务的操作位于不同的节点上,需要保证事务的ACID特性。eg: 下单场景下,库存和订单如果不在同一节点上,就涉及分布式事务

大致思路流程:

测试表:


代码:

死信交互机

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class DeadMq {

    @Bean
    public FanoutExchange fanoutDeadExchange(){
        return new FanoutExchange("fanout_dead_exchange");
    }

    @Bean
    public Queue deadQueue(){
        return new Queue("fanout_dead_order_queue");
    }

    @Bean
    public Binding deadBind(){
        return BindingBuilder.bind(deadQueue()).to(fanoutDeadExchange());
    }

}

声明订单交换机,订单队列 -> 死信交换机

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;


@Configuration
public class TransactionalRabbitMqConfig {
    
    @Bean
    public FanoutExchange transactionFanoutExchange(){
        return new FanoutExchange("fanout_transaction_order_exchange");
    }

    
    @Bean
    public Queue transactionOrderQueue(){
        Map args = new HashMap<>();
        //args.put("x-message-ttl",10000);//过期时间int 类型
        //绑定死信交换机
        //args.put("x-max-length",5);//最大长度5条,超过进入死信队列
        args.put("x-dead-letter-exchange","fanout_dead_exchange");//指定死信交换机
        //args.put("x-dead-letter-routing-key","");//fanout 不用配置
        return new Queue("fanout_transaction_order_queue",true,false,false,args);
    }

    @Bean
    public Binding fanoutOrderBinding(){
        return BindingBuilder.bind(transactionOrderQueue()).to(transactionFanoutExchange());
    }
}

订单服务:推送订单-》MQ, 处理 -死信订单

import com.model.MyOrder;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;


@RabbitListener(queues = {"fanout_dead_order_queue"})
public interface OrderService {

    
    void transactionOrder(MyOrder myOrder);

    
    @RabbitHandler
    void deadOrderHandler(String meeage, Channel channel, Message message);
}
	
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void transactionOrder(MyOrder myOrder) {
        String exchangeName="fanout_transaction_order_exchange";
        MyOrderTmr curTmrOrder = new MyOrderTmr().setOrderId(myOrder.getOrderId()).setCreateTime(new Date()).setStatus(0);
        //添加订单
        int orderFlag = orderMapper.insert(myOrder);
        if (orderFlag>0){
            //设置订单MQ消息,冗余表;表示订单消息处理状态; status 0,默认未投递成功 ,1投递成功。 2产品服务,库存数量扣减成功
            int orderTmrFlag = orderTmrMapper.insert(curTmrOrder);
            if (orderTmrFlag>0){
                //投递消息到交换机
                rabbitTemplate.convertAndSend(exchangeName,"", JsonUtil.toStr(myOrder),new CorrelationData(String.valueOf(myOrder.getOrderId())));
            }
        }
    }

    @Override
    public void deadOrderHandler(String msg, Channel channel, Message message) {
        try {
            System.out.println("处理失败消息");
            System.out.println("消息内容:"+msg);
            MyOrder order = JsonUtil.toObj(msg, MyOrder.class);
            orderMapper.updateByPrimaryKeySelective(order.setStatus(2));
            System.out.println("修改订单状态:2");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            //处理异常,拒绝确认消息, 推送到死信交换机
            try {
                System.out.println("处理失败,拒绝消息");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),
                        false, false);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            System.err.println("get msg1 failed msg = "+msg);
            e.printStackTrace();
        }
    }

    
    @PostConstruct
    public void regCallBack(){
        rabbitTemplate.set/confirm/iCallback((correlationData, ack, cause) -> {
            System.out.println("cause:"+cause);
            assert correlationData != null;
            String orderId = correlationData.getId();
            //ack 为true代表收到消息
            if (!ack){
                System.out.println("MQ队列应答--失败,OrderId:"+orderId+",删除订单相关数据");
                orderMapper.deleteByPrimaryKey(new MyOrder().setOrderId(Long.parseLong(orderId)));
                orderTmrMapper.deleteByPrimaryKey(new MyOrderTmr().setOrderId(Long.parseLong(orderId)));
                return;
            }
            try {
                System.out.println("MQ队列应答--成功, 修改订单冗余状态 status=1,OrderId:"+orderId);
                //修改冗余消息状态,确认收到消息
                orderTmrMapper.updateByPrimaryKeySelective(new MyOrderTmr().setOrderId(Long.parseLong(orderId)).setStatus(1));
            }catch (Exception ex){
                System.out.println("本地消息出现异常"+ex.getMessage());
            }
        });
    }

另一节点库存服务:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;


@RabbitListener(queues = {"fanout_transaction_order_queue"})
public interface TransactionalOrderConsumer {

    @RabbitHandler
    void orderHandler(String meeage, Channel channel, Message message);

}
import com.consumer.TransactionalOrderConsumer;
import com.mapper.ProductMapper;
import com.model.MyOrder;
import com.model.Product;
import com.rabbitmq.client.Channel;
import com.util.JsonUtil;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;


@Service("transactionalOrderConsumer")
public class TransactionalOrderConsumerImpl implements TransactionalOrderConsumer {

    @Resource
    ProductMapper productMapper;

    
    @Override
    public void orderHandler(String msg, Channel channel, Message message) {
        //处理状态,默认成功,false
        boolean handlerFlag=false;
        long msgTag = message.getMessageProperties().getDeliveryTag();
        try {
            //确认消息处理,返回ack
            System.out.println("消息内容:"+msg);
            //报错,死信测试
            //int i=1/0;
            MyOrder order = JsonUtil.toObj(msg, MyOrder.class);
            Product curProduct = productMapper.selectByPrimaryKey( new Product().setId(order.getProductId()));
            //修改产品数量
            Product product1 = curProduct.setCount(curProduct.getCount()-1);
            productMapper.updateByPrimaryKey(product1);
            System.out.println("产品数量修改成功:"+product1);
        } catch (Exception e) {
            handlerFlag=true;
            e.printStackTrace();
        }finally {
            if (handlerFlag){
                //确认消息
                toDo(channel, msgTag);
            }else {
                //拒绝消息
                toReject(channel, msgTag);
            }
        }
    }

    
    private void toReject(Channel channel,Long msgTag){
        try {
            channel.basicNack(msgTag, false,false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    private void toDo(Channel channel,Long msgTag){
        try {
            channel.basicAck(msgTag, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

测试入口

@RestController
public class OrderController {
    @Resource
    private OrderService orderService;

    @GetMapping("/sendOrder")
    public String order(){
        String orderName="购买白象方便面";
        long orderId = SnowflakeIdWorker.id();
        long productId = 1L;
        MyOrder curMyOrder = new MyOrder().setOrderId(orderId).setProductId(productId).setOrderName(orderName).setCreateTime(new Date());
        orderService.transactionOrder(curMyOrder);
        return "订单生产成功";
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773366.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号