- 1. 创建SpringBoot项目,引入相关依赖
- 2. 为了减少重复代码书写,编写连接工厂创建信道的工具类
- 3. 编写生产者代码
- 4. 编写消费者代码
- 5. 运行代码测试
2. 为了减少重复代码书写,编写连接工厂创建信道的工具类org.springframework.boot spring-boot-starter-amqp commons-io commons-io 2.6 org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine org.springframework.amqp spring-rabbit-test test
public class RabbitMqUtils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("120.24.192.216");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
3. 编写生产者代码
public class Task01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台中接收信息发送
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
}
}
}
4. 编写消费者代码
public class Worker01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//消息的接收
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("接收到的消息:"+new String(message.getBody()));
};
//消息接收被取消时执行的内容
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息者取消消息消费接口回调逻辑");
};
System.out.println("C2等待接收消息......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
5. 运行代码测试
多进程运行两个消费者代码,首先设置idea可以多进程运行,选中Allow multiple instances
运行生产者代码,并手动输入需要发送的消息
观察消费者接收消息的情况



