栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq实战指南(rabbitmq入门书籍)

rabbitmq实战指南(rabbitmq入门书籍)

RabbitMQ入门 1.什么是消息队列?

消息队列是在消息的传送过程中保存消息的容器。在消息队列中,通常有生产者,消费者模型,生产者不断的向消息队列中发送消息,消费者不断的从队列中获取消息,由于消息的生产和接收都是异步的,实现了生产者和消费者之间的解耦

2.为什么使用消息队列?

解耦:在我们开发的过程中,我们有的时候可能会遇到和其他系统进行数据的交互,我们一般情况下会调用对方系统的接口将数据传输过去,这样看着是没啥问题的,可是耦合性比较高,假如某天我们不需要传数据了或者需要多加一个系统,这个时候我们就需要修改代码,如果使用消息队列,则不需要关注这么多,我们只需要将数据放到队列中,谁需要则谁去取。流量削峰:假设在同一时刻大量的请求发送过来,这样数据库就会执行大量的SQL,有可能会导致数据库崩溃,在使用MQ的情况下,系统先将数据发送到MQ,然后由消费者拉取数据到数据库执行,防止大批量SQL执行导致数据库崩溃。异步执行:当我们在淘宝上买东西的时候,可以分为生成订单,扣减库存,扣费,物流,发送短信等一系列步骤,如果同步执行,那么耗费的时间是很长的,很影响客户的使用,使用Mq的情况下可以将一下非必要的业务的数据放到MQ里,例如发送短信,直接返回成功,提高了客户的体验。 3.什么是RabbitMQ?

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现

RabbitMQ 是一个消息代理:它接受和转发消息。您可以将其视为邮局:当您将要邮寄的邮件放入邮箱时,您可以确定邮递员最终会将邮件投递给您的收件人。在这个类比中,RabbitMQ 是一个邮箱、一个邮局和一个邮递员。

RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块,消息。

4.六种消息模型

首先导入maven依赖


    com.rabbitmq
    amqp-client
    5.13.1

4.1 基本消息模型

P代表生产者,红色部分为消息队列,C代表消费者

RabbitMq连接工具类

package org.best.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMqUtil {

    private static final String RABBITMQ_HOST="127.0.0.1";

    private static final Integer RABBITMQ_PORT=5672;

    private static final String RABBITMQ_USERNAME="best";

    private static final String RABBITMQ_PASSWORD="best1103";

    private static final String RABBITMQ_VIRTUAL_HOST="/best";

    public static Connection getConnection() throws IOException, TimeoutException {

        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置ip和端口
        connectionFactory.setHost(RABBITMQ_HOST);
        connectionFactory.setPort(RABBITMQ_PORT);
        //设置用户名,密码,虚拟主机
        connectionFactory.setUsername(RABBITMQ_USERNAME);
        connectionFactory.setPassword(RABBITMQ_PASSWORD);
        connectionFactory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //返回连接
        return  connection;

    }
}

生产者

package org.best.basic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    //队列名称
    private static final  String  BASIC_QUEUE_NAME="basicQueue";

    //发送的消息内容,一般在我们的开发中这个消息是不固定的,一般来说是传过来的参数或者数据库里的内容
    private static final String MESSAGE="hello word";

    public static void main(String[] args) {
        Connection connection=null;
        Channel channel=null;
        try {
            //获得连接
            connection= RabbitMqUtil.getConnection();
            //创建管道
            channel = connection.createChannel();
            
            //声明队列
            channel.queueDeclare(BASIC_QUEUE_NAME,false,false,false,null);
            
            //发送消息
            channel.basicPublish("",BASIC_QUEUE_NAME,null,MESSAGE.getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            //关闭连接
            try {
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者

package org.best.basic;

import com.rabbitmq.client.*;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    //队列名称
    private static final  String  BASIC_QUEUE_NAME="basicQueue";

    public static void main(String[] args) {
        Connection connection=null;
        Channel channel =null;
        try {
            //获取连接
             connection= RabbitMqUtil.getConnection();
            //创建管道
            channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(BASIC_QUEUE_NAME,false,false,false,null);
            //消费消息,第一个参数代表队列名称,第二个参数代表是否自动确认消息
            channel.basicConsume(BASIC_QUEUE_NAME,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //打印消息
                    System.out.println(new String(body));
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            //关闭连接
            try{
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
4.2 工作队列消息模型

仔细看一下图片其实发现工作队列消息模型和基本消息模型的区别在于工作队列消息模型有多个消费者,当有多个消费者的时候,消息会被哪个消费者消费呢

1.轮询模式:将消息轮流发送到每个消费者

2.公平调度:根据消费者的消费能力进行分发,在消费者没有确认后,不要向消费者发送新消息

轮询模式

生产者

package org.best.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private static final String RABBITMQ_QUEUE="workQueue";
    private static final String MESSAGE="hello";
    public static void main(String[] args) {
        try {
            //获得连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(RABBITMQ_QUEUE,false,false,false,null);
            //发送消息
            for (int i=0;i<100;i++) {
                channel.basicPublish("",RABBITMQ_QUEUE,null,(MESSAGE+i).getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

消费者1

package org.best.work;

import com.rabbitmq.client.*;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    private static final String RABBITMQ_QUEUE="workQueue";
    public static void main(String[] args) {
        try {
            //获得连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(RABBITMQ_QUEUE,false,false,false,null);
            
             // 消费消息
            channel.basicConsume(RABBITMQ_QUEUE,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        //休眠2s
                        Thread.sleep(2000);
                        //控制台打印消息
                        System.out.println(new String(body));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

消费者2

package org.best.work;

import com.rabbitmq.client.*;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    private static final String RABBITMQ_QUEUE="workQueue";
    public static void main(String[] args) {
        try {
            //获取连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(RABBITMQ_QUEUE,false,false,false,null);
            
            // 消费消息
            channel.basicConsume(RABBITMQ_QUEUE,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //控制台打印消息
                    System.out.println(new String(body));
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

公平调度

生产者

package org.best.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private static final String RABBITMQ_QUEUE="workQueue";
    private static final String MESSAGE="hello";
    public static void main(String[] args) {
        try {
            //获得连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(RABBITMQ_QUEUE,false,false,false,null);
            //发送消息
            for (int i=0;i<100;i++) {
                channel.basicPublish("",RABBITMQ_QUEUE,null,(MESSAGE+i).getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

消费者1

package org.best.work;

import com.rabbitmq.client.*;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    private static final String RABBITMQ_QUEUE="workQueue";
    public static void main(String[] args) {
        try {
            //获得连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(RABBITMQ_QUEUE,false,false,false,null);
            //每次只给消费者发一条消息,只有消费者回复确认以后才会继续发
            channel.basicQos(1);
            
             // 消费消息
            channel.basicConsume(RABBITMQ_QUEUE,false,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        //休眠2s
                        Thread.sleep(2000);
                        //控制台打印消息
                        System.out.println(new String(body));
                        //手动确认
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

消费者2

package org.best.work;

import com.rabbitmq.client.*;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    private static final String RABBITMQ_QUEUE="workQueue";
    public static void main(String[] args) {
        try {
            //获取连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(RABBITMQ_QUEUE,false,false,false,null);
            //每次只给消费者发一条消息,只有消费者回复确认以后才会继续发
            channel.basicQos(1);
            
            // 消费消息
            channel.basicConsume(RABBITMQ_QUEUE,false,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //控制台打印消息
                    System.out.println(new String(body));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
4.3 消息确认机制(ack)

当消息一旦被消费者接收,就会删除队列中的消息。但是RabbitMq如何知道消息被消费者接收了呢,这就涉及到了RabbitMq的ack机制,当消费者获取到消息后,会向RabbitMq发送回执ack,告诉消息已经被接收。

自动ack:消息一旦被接收,消费者自动发送ack手动ack:消息被接收后,不会发送ack,需要手动调用

我们开发中应该怎么选呢?自动ack的好处是速度快,但是也不安全,手动ack的好处是较为安全,但是降低了服务器的吞吐量,如果消息不太重要,可以选用自动ack,反之则使用手动ack。

basicAck

basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个例子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

baickNack

basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。

multiple:是否批量确认。

requeue:值为 true 消息将重新入队列

basicReject

basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。

requeue:值为 true 消息将重新入队列

4.4 发布订阅者模式(广播模式 fanout)

生产者

package org.best.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    //交换机名称不要以amq. 开头,否则会403异常
    private static final String RABBIT_EXCHANGE="best.fanout";
    private static final String MESSAGE="bestlee";
    public static void main(String[] args) {
        try {
            //创建连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(RABBIT_EXCHANGE,"fanout");
            //发送消息
            channel.basicPublish(RABBIT_EXCHANGE,"",null,MESSAGE.getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

消费者

package org.best.fanout;

import com.rabbitmq.client.*;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    private static final String RABBIT_EXCHANGE="best.fanout";
    public static void main(String[] args) {
        try {
            //获得连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(RABBIT_EXCHANGE,"fanout");
            //创建一个不持久化,独占,自动删除的临时队列
            String queue = channel.queueDeclare().getQueue();
            //将队列和交换机绑定
            channel.queueBind(queue,RABBIT_EXCHANGE,"");
            //消费消息
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //打印消息
                    System.out.println(new String(body));
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
4.5 Routing消息模型(交换机类型: direct)

生产者

package org.best.routing;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.impl.AMQBasicProperties;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    //交换机名称
    private static final String RABBIT_EXCHANGE="best.routing";
    //路由键名称
    private static final String ROUTING_KEY="best";
    //消息
    private static final String MESSAGE="hello best";
    public static void main(String[] args) {
        try {
            //获得连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //绑定交换机
            channel.exchangeDeclare(RABBIT_EXCHANGE,"direct");
            //发送消息
            channel.basicPublish(RABBIT_EXCHANGE,ROUTING_KEY, null,MESSAGE.getBytes());
            System.out.println("sent发送消息"+"["+MESSAGE+"]成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

消费者

package org.best.routing;

import com.rabbitmq.client.*;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {

    private static final String RABBIT_EXCHANGE="best.routing";
    private static final String ROUTING_KEY="best";
    public static void main(String[] args) {
        try {
            //创建连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(RABBIT_EXCHANGE,"direct");
            //创建临时队列
            String queue = channel.queueDeclare().getQueue();
            //绑定队列,可同时绑定多个
            channel.queueBind(queue,RABBIT_EXCHANGE,ROUTING_KEY);
            //消费消息
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //打印消息
                    System.out.println(new String(body));
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
4.6 主题(通配符)模式(交换机类型: topic)

生产者

package org.best.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    //交换机名称
    private static final String RABBIT_EXCHANGE="best.topic";
    //路由键名称
    private static final String ROUTING_KEY="best.lee.yu";

    //消息
    private static final String MESSAGE="hello best";

    public static void main(String[] args) {
        try {
            //创建连接
            Connection connection = RabbitMqUtil.getConnection();
            //创建管道
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(RABBIT_EXCHANGE,"topic");
            //发送消息
            channel.basicPublish(RABBIT_EXCHANGE,ROUTING_KEY,null,MESSAGE.getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

消费者

package org.best.topic;

import com.rabbitmq.client.*;
import org.best.util.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    //交换机名称
    private static final String RABBIT_EXCHANGE="best.topic";
    //路由键名称
    /
    public  void send(String exchange,String routingKey,Object message){
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }

}

消费者

package org.best.service.imp;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
//监听队列
@RabbitListener(queues = {"fanoutQueue1","fanoutQueue2"})
public class FanOutConsumer {

    //消费者方法
    @RabbitHandler
    public void consumer(String message) {
        System.out.println(message);
    }
}

测试类

package org.best.controller;

import org.best.config.FanoutOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {
    @Autowired
    private FanoutOrder fanoutOrder;

    @GetMapping("/test")
    public String test(){
       fanoutOrder.send("bestfanout",null,"5201313");
       return  "success";
    }
}

5.2 Direct模式

RabbitMqConfiguration

package org.best.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfiguration {
    @Value(value = "${rabbit.direct.exchangeName}")
    public  String directExchangeName;

    @Value(value = "${rabbit.direct.queueName1}")
    public String  directQueueName1;

    @Value(value = "${rabbit.direct.queueName2}")
    public String  directQueueName2;

//声明direct交换机
@Bean
public DirectExchange directExchange(){
    return new DirectExchange(directExchangeName,false,false);
}
//声明队列1
@Bean
public Queue directQueue1(){
    return new Queue(directQueueName1,false,false,false);
}
//声明队列2
@Bean
public Queue directQueue2(){
    return new Queue(directQueueName2,false,false,false);
}
//将direct交换机和队列通过路由键best绑定起来
@Bean
public Binding directBinding1(){
    return BindingBuilder
            .bind(directQueue1())
            .to(directExchange())
            .with("best");
}

//将direct交换机和队列通过路由键good绑定起来
@Bean
public Binding directBinding2(){
    return BindingBuilder
            .bind(directQueue2())
            .to(directExchange())
            .with("good");
}
}

生产者

package org.best.config;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class DirectOrder {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    //生产者方法
    
    public  void send(String exchange,String routingKey,Object message){
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }

}

消费者1

package org.best.service.imp;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"directQueue1"})
public class DirectConsumer {

    @RabbitHandler
    public void handler(String message){
        System.out.println("directQueue1收到消息"+message);
    }
}

消费者2

package org.best.service.imp;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"directQueue2"})
public class DirectConsumer2 {

    @RabbitHandler
    public void handler(String message){
        System.out.println("directQueue2收到消息"+message);
    }
}

测试

package org.best.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {
    @Autowired
    private DirectOrder directOrder;
    


    @GetMapping("/direct")
    public String test1(String routingKey){
        //根据不同的路由键分发到不同的队列里
        directOrder.send("bestdirect",routingKey,"5201313");
        return  "success";
    }
}

5.3 Topic 模式

RabbitMqConfiguration

package org.best.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfiguration {
//声明交换机
@Bean
public TopicExchange topicExchange(){
    return new TopicExchange(topicExchangeName,false,false);
}
//声明队列
@Bean
public Queue topicQueue1(){
    return new Queue(topicQueueName1,false,false,false);
}

//声明队列
@Bean
public Queue topicQueue2(){
    return new Queue(topicQueueName2,false,false,false);
}
//绑定队列和交换机
@Bean
public Binding topicBinding1(){
    return BindingBuilder
            .bind(topicQueue1())
            .to(topicExchange())
            .with("best.*");
}
//绑定队列和交换机
@Bean
public Binding topicBinding2(){
    return BindingBuilder
            .bind(topicQueue2())
            .to(topicExchange())
            .with("best.#");
}
}

生产者

package org.best.config;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicOrder {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    //生产者方法
    
    public  void send(String exchange,String routingKey,Object message){
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }

}

消费者1

package org.best.service.imp;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"topicQueue1"})
public class TopicConsumer {

    @RabbitHandler
    public void handler(String message){
        System.out.println("topicQueue1收到消息"+message);
    }
}

消费者2

package org.best.service.imp;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"topicQueue2"})
public class TopicConsumer2 {

    @RabbitHandler
    public void handler(String message){
        System.out.println("topicQueue2收到消息"+message);
    }
}

测试

package org.best.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {
    @Autowired
    private TopicOrder topicOrder;
    


      @GetMapping("/topic")
    public String test2(String routingKey){
        //根据不同的路由键分发到不同的队列里
       topicOrder.send("besttopic",routingKey,"5201313");
        return  "success";
    }
}
6.RabbitMq高级特性

6.1 过期时间 TTL(Time To Live)

RabbitMQ可以对消息和队列设置TTL。RabbitMQ支持设置消息的过期时间,在消息发送的时候可以进行指定,每条消息的过期时间可以不同。RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,直到超过了队列的超时时间配置,那么消息会变成死信,自动清除。如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。当然也可以不设置TTL,不设置表示消息不会过期;如果设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息将被立即丢弃

对队列设置TTL

@Bean
public Queue ttlQueue(){
    //设置过期时间
    Map args = new HashMap<>();
    args.put("x-message-ttl",5000);//这里一定是int类型
    return new Queue("ttl.direct.queue",true,false,false,args);}

对消息设置TTL

package org.best.config;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutOrder {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    //生产者方法
    
    public  void send(String exchange,String routingKey,Object message,long times){
        rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration(String .valueOf(times));
                return message;	
            }
        });
    }

}

6.2 死信队列

当消息在一个队列中变成死信之后,他能被重新发送到另一个交换器中,这个交换器成为死信交换器,与该交换器绑定的队列称为死信队列。消息变成死信有下面几种情况:

消息被拒绝。消息过期队列达到最大长度

DLX也是一个正常的交换器,和一般的交换器没有区别,他能在任何的队列上面被指定,实际上就是设置某个队列的属性。当这个队列中有死信的时候,RabbitMQ会自动将这个消息重新发送到设置的交换器上,进而被路由到另一个队列,我们可以监听这个队列中消息做相应的处理。

死信队列有什么用?当发生异常的时候,消息不能够被消费者正常消费,被加入到了死信队列中。后续的程序可以根据死信队列中的内容分析当时发生的异常,进而改善和优化系统

RabbitMqConfiguration

package org.best.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

@Configuration
public class RabbitMqConfiguration {




    //定义一个交换机
    @Bean
    public DirectExchange direct(){
        return new DirectExchange("exchangeTest",false,false);
    }

    @Bean
    public Queue directQueue(){
        //绑定死信交换机
        HashMap map = new HashMap<>();
        map.put("x-message-ttl",5000); //设置过期时间
        map.put("x-dead-letter-exchange","deadExchange");
        map.put("x-dead-letter-routing-key","dead");  //fanout不需要配置
        return new Queue("queueTest",false,false,false,map);
    }

    //绑定队列和交换器
    @Bean
    public Binding bindingTest(){
        return BindingBuilder
                .bind(directQueue())
                .to(direct())
                .with("best");
    }


    //定义死信交换机
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange("deadExchange",false,false);
    }
    //死信队列
    @Bean
    public Queue deadQueue(){
        return new Queue("deadQueue",false,false,false);
    }

    //将死信队列和死信交换机绑定
    @Bean
    public Binding bindingDead(){
        return BindingBuilder
                .bind(deadQueue())
                .to(deadExchange())
                .with("dead");
    }



}

死信队列消费者

package org.best.service.imp;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"deadQueue"})
public class DeadConsumer {

    @RabbitHandler
    public void handler(String message){
        System.out.println("死信队列收到消息======"+message);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771877.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号