栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq使用教程(rabbitmq详解)

rabbitmq使用教程(rabbitmq详解)

RabbitMQ五种工作模式 1、简单模式

简单模式的特点是一个生产者,一个队列,一个消费者,这种模式不需要进行任何的交换机的binding绑定。

工具类:

public class RabbitMqUtils {
  //得到一个连接的 channel
  public static Channel getChannel() throws Exception {
    //创建一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("host"); //按照实际情况填写自己的ip
    factory.setUsername("admin"); //按照实际情况填写自己的用户名
    factory.setPassword("pwd"); //按照实际情况填写自己的密码
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    return channel;
  }
}

生产者:

public class Producer {
  // 队列名称
  public static final String QUEUE_NAME="hello";
  // 发送消息
  public static void main(String[] args) throws IOException, TimeoutException {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 工厂ip 连接rabbitmq的队列
    factory.setHost("host");
    // 用户名
    factory.setUsername("admin");
    // 密码
    factory.setPassword("pwd");
    // 创建连接
    Connection connection = factory.newConnection();
    // 获取信道
    Channel channel = connection.createChannel();
    
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 发消息
    String message = "hello world"; // 初次使用
    
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println("消息发送完毕");
  }
}

消费者:

public class Consumer {
  // 队列名称
  public static final String QUEUE_NAME="hello";

  // 接收消息
  public static void main(String[] args) throws Exception {
    //创建连接工厂
    Channel channel = RabbitMqUtils.getChannel();
    // 声明 接收消息
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println(new String(message.getBody()));
    };

    //取消消息时的回调
    CancelCallback cancelCallback = (consumerTag)->{
      System.out.println("消费消息被中断");
    };
    
    channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
  }
}
2、Work模式

work模式的特点是一个生产这,一个队列,多个消费者,每个消费者获取到的消息是唯一的。

生产者:

public class Task01 {
  private static final String QUEUE_NAME = "hello";
  public static void main(String[] args) throws Exception {
    try {
      Channel channel = RabbitMqUtils.getChannel();
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      //从控制台当中接受信息
      Scanner scanner = new Scanner(System.in);
      while (scanner.hasNext()) {
        String message = scanner.next();
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("发送消息完成:" + message);
      }
    } catch (Exception e) {
        e.printStackTrace();
    }
  }
}

消费者1:

public class Worker01 {
  // 队列的名称
  public static final String QUEUE_NAME="hello";
  //接收消息
  public static void main(String[] args) throws Exception {
    //获取信道
    Channel channel = RabbitMqUtils.getChannel();
    // 声明 接收消息
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C1接收到的消息:" + new String(message.getBody()));
    };
    //取消消息时的回调
    CancelCallback cancelCallback = (consumerTag)->{
      System.out.println("消费消息被中断");
    };
    System.out.println("C1等待接收消息");
    channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);
  }
}

消费者2:

public class Worker02 {
  // 队列的名称
  public static final String QUEUE_NAME="hello";
  //接收消息
  public static void main(String[] args) throws Exception {
    //获取信道
    Channel channel = RabbitMqUtils.getChannel();
    // 声明 接收消息
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C2接收到的消息:" + new String(message.getBody()));
    };
    //取消消息时的回调
    CancelCallback cancelCallback = (consumerTag)->{
      System.out.println("消费消息被中断");
    };
    System.out.println("C2等待接收消息");
    channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);


  }
}
3、Fanout发布订阅模式

这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。生产者发送的消息会被多个消费者获取。

生产者:

public class EmitLog {
  // 交换机的名称
  private static final String EXCHANGE_NAME="logs";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    Scanner scan = new Scanner(System.in);
    while (scan.hasNext()){
      String message = scan.next();
      channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
      System.out.println("生产者发出消息:"+message);
    }
  }
}

消费者1

public class ReceiveLogs01 {
  // 交换机的名称
  private static final String EXCHANGE_NAME="logs";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 声明一个临时队列
    String queue = channel.queueDeclare().getQueue();
    // 绑定
    channel.queueBind(queue,EXCHANGE_NAME,"",null);
    System.out.println("C1等待接收消息:");
    // 接收消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C1控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    };
    // 消费消息
    channel.basicConsume(queue,true,deliverCallback,(consumerTag)->{
      System.out.println("C1取消接收消息");
    });
  }
}

消费者2:

public class ReceiveLogs02 {
  // 交换机的名称
  private static final String EXCHANGE_NAME="logs";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 声明一个临时队列
    String queue = channel.queueDeclare().getQueue();
    // 绑定
    channel.queueBind(queue,EXCHANGE_NAME,"",null);
    System.out.println("C2等待接收消息:");
    // 接收消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C2控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    };
    // 消费消息
    channel.basicConsume(queue,true,deliverCallback,(consumerTag)->{
      System.out.println("C2取消接收消息");
    });
  }
}
4、Direct路由模式

路由模式的特点是:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定绑定key,当绑定key和路由key完全匹配时,消费者才能消费消息,一个队列可以绑定多个key。

生产者:

public class DirectLogs {
  // 交换机的名称
  private static final String EXCHANGE_NAME="direct_logs";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    Scanner scan = new Scanner(System.in);
    while (scan.hasNext()){
      String message = scan.next();
      // 修改routingKey参数 即可想要发送到指定的消费方
      channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes());
      System.out.println("生产者发出消息:"+message);
    }
  }
}

消费者1:

public class ReceiveLogsDirect01 {
  // 交换机名称
  private static final String EXCHANGE_NAME = "direct_logs";
  // 队列名称
  private static final String QUEUE_NAME = "console";
  public static void main(String[] args) throws Exception {
    // 获取信道
    Channel channel = RabbitMqUtils.getChannel();
    //声明一个交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    // 声明一个队列
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    // 绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
    // 消费消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C1控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    };
    //消费消息
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
      System.out.println("C1取消接收消息");
    });
  }
}

消费者2:

public class ReceiveLogsDirect02 {
  // 交换机名称
  private static final String EXCHANGE_NAME = "direct_logs";
  // 队列名称
  private static final String QUEUE_NAME = "console";
  public static void main(String[] args) throws Exception {
    // 获取信道
    Channel channel = RabbitMqUtils.getChannel();
    //声明一个交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    // 声明一个队列
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    // 绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
    // 消费消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C2控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    };
    //消费消息
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
      System.out.println("C2取消接收消息");
    });
  }
}
5、Topic通配符模式

该模式的特点是:用通配符进行匹配,#匹配0个或多个单词,*匹配一个单词。

生产者:

public class TopicLogs {
  // 交换机的名称
  private static final String EXCHANGE_NAME="topic_logs";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    Scanner scan = new Scanner(System.in);
    while (scan.hasNext()){
      String message = scan.next();
      // 修改routingKey参数 即可想要发送到指定的消费方
       channel.basicPublish(EXCHANGE_NAME,"quick.orange.fox",null,message.getBytes());
      System.out.println("生产者发出消息:"+message);
    }
  }
}

消费者1:

public class ReceiveLogsTopic01 {
  // 交换机名称
  private static final String EXCHANGE_NAME = "topic_logs";
  // 队列名称
  private static final String QUEUE_NAME = "q1";
  public static void main(String[] args) throws Exception {
    // 获取信道
    Channel channel = RabbitMqUtils.getChannel();
    //声明一个交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    // 声明一个队列
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    // 绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
    // 消费消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C1控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
      System.out.println("接收队列:"+QUEUE_NAME+"  绑定键:"+message.getEnvelope().getRoutingKey());
    };
    //消费消息
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
      System.out.println("C1取消接收消息");
    });
  }
}

消费者2:

public class ReceiveLogsTopic02 {
  // 交换机名称
  private static final String EXCHANGE_NAME = "topic_logs";
  // 队列名称
  private static final String QUEUE_NAME = "q2";
  public static void main(String[] args) throws Exception {
    // 获取信道
    Channel channel = RabbitMqUtils.getChannel();
    //声明一个交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    // 声明一个队列
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    // 绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
    // 消费消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C2控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
      System.out.println("接收队列:"+QUEUE_NAME+"  绑定键:"+message.getEnvelope().getRoutingKey());
    };
    //消费消息
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
      System.out.println("C2取消接收消息");
    });
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/772111.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号