我们简单的入门一下我们的Rabbit,使用java完成一个生产者,一个消费者,完成先关的操作。
1、工作原理图 2、创建一个Maven项目,导入对应的依赖3、大概流程示意 4、生产者代码com.rabbitmq amqp-client 5.8.0 commons-io commons-io 2.6
public class Producer {
// 队列的名字
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
// 1、创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接的主机信息 主机ip,用户名,密码
connectionFactory.setHost("192.168.115.128");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123");
// 3、获取一个连接
Connection connection = connectionFactory.newConnection();
// 4、获取一个信道
Channel channel = connection.createChannel();
// 5、声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 6、声明发送的信息
String message = "hello,world";
// 7、发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("message 发送结束");
}
}
5、生产者中代码调用方法解释
5.1、声明队列方法queueDeclare()
该方法是生产者用来声明一个队列,如果该队列不存在就创建一个对列,存在的话就直接使用
方法参数解释:
1、队列的名字
2、队列里面的消息是否支持持计化
3、设置该队列,是否可以供对个消费者消费
4、是否自动删除消息
5、其他参数
// 5、声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
5.2、执行发送消息的方法basicPublish()
basicPublish()方法是生产者,用来推送消息的使用的
参数解释:
1、使用那个交换机
2、推送到那个队列的l路由key
3、其他参数的信息
4、发送消息的消息体
// 7、发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("message 发送结束");
6、消费者代码
public class Consumer {
// 队列的名字
public static final String QUEUE_NAME = "hello";
// 接受消息
public static void main(String[] args) throws Exception {
// 1、创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接的主机信息 主机ip,用户名,密码
connectionFactory.setHost("192.168.115.128");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123");
// 3、获取一个连接
Connection connection = connectionFactory.newConnection();
// 4、获取一个信道
Channel channel = connection.createChannel();
// 5、消费者消费信道
//声明 接受消息的回调函数时接口对象
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(message.getBody());
};
// 取消消息时的回调函数时接口对象
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
6.1、回调函数DeliverCallback
我们消费消息时,我们需要从消息队列中取到对应的消息,但是存在如何消费消息的问题,这个回调函数就是用来完成对消息的如何消费的。
//声明 接受消息的回调函数时接口对象
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
6.2、回调函数CancelCallback
消息消费取消时的回调函数
// 取消消费消息时的回调函数时接口对象
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
6.3、basicConsume()参数解释
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
7、演示
启动我们的生产者代码,启动我们的消费者代码
查看消息发送与消息的消费。



