栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springCloud-rabbitMQ

springCloud-rabbitMQ

1.依赖

    org.springframework.boot
    spring-boot-starter-amqp
2.简单队列模型 2.1 publisher
 @GetMapping("/basic")
    public void basic(){
        System.out.println("实现基础消息队列功能");
        rabbitTemplate.convertAndSend("simple.queue", "hello nihao");
    }
2.2 consumer
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String simpleQueueMsg){
    System.out.println("消费者接收到rabbitMQ的simpleQueue的消息"+simpleQueueMsg);
}
3.工作队列模型 3.1 publisher
 @GetMapping("/work")
    public void word() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "work test == ";
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message + i + " ");
            TimeUnit.MILLISECONDS.sleep(20);
        }
    }
3.2 consumer
 @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueWork(String simpleQueueMsg) throws InterruptedException {
        System.out.println("消费者1接收到的消息 == " + simpleQueueMsg + LocalTime.now());
        Thread.sleep(20);  //消费者1每秒处理50条
    }

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueWork2(String simpleQueueMsg) throws InterruptedException {
        System.err.println("消费者2接收到的消息 == " + simpleQueueMsg + LocalTime.now());
        Thread.sleep(200);  //消费者1每秒处理5条
    }
3.3 consumer application.yml

因为其他配置都一样,这里单独拿出来.

spring:
  rabbitmq:
    virtual-host: /
    username: root
    password: root
    host: 116.62.71.234
    port: 5672
    listener:
      simple:
        prefetch: 1   //预取一条,让处理满的少处理
server:
  port: 9000
4.Fanout 4.1 publisher
@GetMapping("/fanout")
public void publishToItcastFanoutExchangeWithFanout(){
    rabbitTemplate.convertAndSend("itcast.fanout", "", "hello fanout");
}
4.2 consumer
    //-------------
   @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout.queue1"),
            exchange = @Exchange(value = "itcast.fanout",type = ExchangeTypes.FANOUT)
    ))
    public void listenItcastExchangeConsumer1(String fanoutExchangeMessage){
        System.out.println("消费者1接收到的消息为 " + fanoutExchangeMessage);
    }
    //-------------
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout.queue2"),
            exchange = @Exchange(value = "itcast.fanout",type = ExchangeTypes.FANOUT)
    ))
    public void listenItcastFanoutExchangeConsumer2(String fanoutExchangeMessage){
        System.out.println("消费者2接收到的消息为 " + fanoutExchangeMessage);
    }
5.direct 5.1 publisher
  @GetMapping("/blue")
    public void sendDirectWithTheKeyBlue(){
        rabbitTemplate.convertAndSend("itcast.direct", "blue", "the key is blue");
    }


    @GetMapping("/yellow")
    public void sendDirectWithTheKeyYellow(){
        rabbitTemplate.convertAndSend("itcast.direct", "yellow", "the key is red");
    }


    @GetMapping("/red")
    public void sendDirectWithTheKeyRed(){
        rabbitTemplate.convertAndSend("itcast.direct", "red", "the key is red");
    }
5.2 consumer
   //---------------------
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("direct.queue1"),
            exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"blue","red"}
    ))
    public void  directConsumer1(String msg){
        System.out.println("消费者1从itcastDirect交换机得到的数据为"+msg);
    }

	//--------------------
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("direct.queue2"),
            exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"yellow","red"}
    ))
    public void directConsumer2(String msg){
        System.out.println("消费者2从itcastDirect交换机得到的数据为"+msg);
    }
6.topic 6.1 publisher
 @GetMapping("/chinaReg")
    public void RouterChainReg() {
        String exchangeName = "itcast.queue";
        String routerKey = "china.weather";
        String msg = "中国Reg";
        rabbitTemplate.convertAndSend(exchangeName, routerKey, msg);
    }

    @GetMapping("/RegNews")
    public void RouterRegNews() {
        String exchangeName = "itcast.queue";
        String routerKey = "USA.news";
        String msg = "Reg新闻";
        rabbitTemplate.convertAndSend(exchangeName, routerKey, msg);
    }


    @GetMapping("/chinaNews")
    public void RouterChainNews() {
        String exchangeName = "itcast.queue";
        String routerKey = "china.news";
        String msg = "中国新闻";
        rabbitTemplate.convertAndSend(exchangeName, routerKey, msg);
    }
6.2 consumer
  //Queue2
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue1"),
            exchange = @Exchange(value = "itcast.queue", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void topicQueue1(String topicQueueMsg) {
        System.out.println("消费者从队列1取出的数据为:" + topicQueueMsg);
    }

    //Queue1
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue2"),
            exchange = @Exchange(value = "itcast.queue", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void topicQueue2(String topicQueueMsg) {
        System.out.println("消费者从队列2取出的数据为:" + topicQueueMsg);
    }
7.message convert
  • 针对发送的消息不是String类型的处理

7.1 publisher-pom

    com.fasterxml.jackson.core
    jackson-databind

        


    org.springframework.boot
    spring-boot-starter-amqp
     
7.2 publisher-Bean
@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}
7.3 publisher
@GetMapping("/messageConverter")
    public void messageConverter(){
        Map liuyan = new HashMap<>();
        liuyan.put("name","柳岩");
        liuyan.put("age", 21);
        rabbitTemplate.convertAndSend("simple.queue",liuyan);
    }
7.4 consumer-pom

    com.fasterxml.jackson.core
    jackson-databind

        


    org.springframework.boot
    spring-boot-starter-amqp
 
7.5 consumer-Bean
@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}
7.6 consumer
@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389164.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号