1:jdk1.8
2:构建一个maven工程
3:导入rabbitmq的maven依赖
4:启动rabbitmq-server服务
5:定义生产者
6:定义消费者
7:观察消息的在rabbitmq-server服务中的过程
2、RabbitMQ服务com.rabbitmq amqp-client 5.10.0
docker start myrabbit3、定义生产者
package com.sky.rabbitmq.simple;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args){
// 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过RabbitMQ遵循的是amqp
// ip port
// 1:创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// ip
connectionFactory.setHost("47.112.138.176");
// 端口不是管理界面的端口
connectionFactory.setPort(5672);
// 登录账号
connectionFactory.setUsername("admin");
// 登录密码
connectionFactory.setPassword("admin");
// 访问路径根节点
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
// 2:创建连接connection
try {
connection = connectionFactory.newConnection("生产者");
// 3:通过连接获取通道channel
channel = connection.createChannel();
// 4:通过创建交换机,声明队列,绑定关系,路由key,发送消息和接收消息
String queueName = "queue1";
channel.queueDeclare(queueName,false,false,true,null);
// 5:准备消息内容
String message = "HelloWorld";
// 6:发送消息给队列queue 这里空字符串,是一个交换机
channel.basicPublish("",queueName,null,message.getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
// 7:关闭连接
if (channel!=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8:关闭通道
if (connection!= null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
4、消费者
package com.sky.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过RabbitMQ遵循的是amqp
// ip port
// 1:创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// ip
connectionFactory.setHost("47.112.138.176");
// 端口不是管理界面的端口
connectionFactory.setPort(5672);
// 登录账号
connectionFactory.setUsername("admin");
// 登录密码
connectionFactory.setPassword("admin");
// 访问路径根节点
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
// 2:创建连接connection
try {
connection = connectionFactory.newConnection("消费者");
// 3:通过连接获取通道channel
channel = connection.createChannel();
// 4:通过创建交换机,声明队列,绑定关系,路由key,发送消息和接收消息
String queueName = "queue1";
channel.queueDeclare(queueName,false,false,true,null);
// 5:准备消息内容
String message = "HelloWorld";
// 6:发送消息给队列queue
channel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到消息是" + new String(delivery.getBody(),"UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("接收消息失败。。。");
}
});
System.out.println("开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
// 7:关闭连接
if (channel!=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8:关闭通道
if (connection!= null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
5、测试
执行发送
运行生产者
接收关闭consumer后队列删除
6、图解



