1、将获取连接代码封装:
public class ConnUtils {
public Connection getConnection() throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置参数
// ip设置:默认值也是localhost
factory.setHost("localhost");
// port设置 默认值:5672
factory.setPort(5672);
// 虚拟机设置 默认值:/
factory.setVirtualHost("/echo");
// 用户名设置 默认 guest
factory.setUsername("echo");
// 密码设置 默认 guest
factory.setPassword("echo");
// 3、创建连接 Connection
Connection connection = factory.newConnection();
return connection;
}
}
1、编写生产者代码:
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、获取连接对象
ConnUtils cs = new ConnUtils();
Connection connection = cs.getConnection();
// 2、创建频道
Channel channel = connection.createChannel();
// 3、声明队列
channel.queueDeclare("work_queue", true, false, false, null);
// 4、发送消息
for (int i = 1; i <= 30; i++) {
// 发送信息
String message = i + "工作模式" ;
channel.basicPublish("", "work_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);
}
// 5、关闭资源
channel.close();
connection.close();
}
}
2、编写消费者01代码:
public class Consumer_WorkQueue1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、获取连接
ConnUtils cs = new ConnUtils();
Connection connection = cs.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、创建队列
channel.queueDeclare("work_queue", true, false, false, null);
// 4、接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
// 5、监听消息
channel.basicConsume("work_queue",true,consumer);
}
}
3、编写消费者02代码:
public class Consumer_WorkQueue2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、获取连接
ConnUtils cs = new ConnUtils();
Connection connection = cs.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、创建队列
channel.queueDeclare("work_queue", true, false, false, null);
// 4、接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
// 5、监听消息
channel.basicConsume("work_queue",true,consumer);
}
}
2、测试
1、运行消费者01和02代码 2、运行生产者代码
01:控制台: body:1工作模式 body:3工作模式 body:5工作模式 body:7工作模式 body:9工作模式 body:11工作模式 body:13工作模式 body:15工作模式 body:17工作模式 body:19工作模式 body:21工作模式 body:23工作模式 body:25工作模式 body:27工作模式 body:29工作模式
02:控制台: body:2工作模式 body:4工作模式 body:6工作模式 body:8工作模式 body:10工作模式 body:12工作模式 body:14工作模式 body:16工作模式 body:18工作模式 body:20工作模式 body:22工作模式 body:24工作模式 body:26工作模式 body:28工作模式 body:30工作模式
项目代码链接:https://github.com/Mbm7280/rabbitmq_demo



