这里的“P”是生产者,“C”是消费者。中间的框是一个队列-RabbitMQ,代表使用者保留的消息缓冲区。
二、生产者代码实现2.1 创建一个工程,pom依赖
com.rabbitmq amqp-client5.14.0 commons-io commons-io2.6
2.2 生产者代码
public class Producer {
private final static String QUEUE_NAME="hello";
public static void main(String[] args) {
//创建一个链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
try{
//channel实现了自动close接口,不需要显示关闭
Connection connection=factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="Hello world";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完毕");
}catch (Exception e)
{
e.printStackTrace();
}
}
}
2.3 查看消息队列
三、 消费者的代码实现3.1 代码实现
public class Concumer {
private final static String QUEUE_NAME="hello";
public static void main(String[] args) {
//创建一个链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
try{
//channel实现了自动close接口,不需要显示关闭
Connection connection=factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
System.out.println("消费的消息:"+message);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}catch (Exception e)
{
e.printStackTrace();
}
}
}
3.2 查看消息队列
可以看到消息已经被消费



