pom.xml:
4.0.0 org.springframework.boot spring-boot-starter-parent 2.5.6 com.zoomy rabbitmq 0.0.1-SNAPSHOT rabbitmq Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test com.rabbitmq amqp-client 5.10.0 org.springframework.boot spring-boot-maven-plugin
一、简单模式simple
Producer:
package com.zoomy.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("******");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接Connection
connection = connectionFactory.newConnection("生产者");
// 3.通过连接获取通道Channel
channel = connection.createChannel();
// 4.通过创建交换机,声明队列,绑定关系,路由Key,发送消息,和接收消息
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
// 5.准备消息内容
String message = "Hello rabbitmq!!!";
// 6.发送消息给队列queue
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7.关闭连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8.关闭通道
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Consumer:
package com.zoomy.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
// 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("*******");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接Connection
connection = connectionFactory.newConnection("生产者");
// 3.通过连接获取通道Channel
channel = connection.createChannel();
// 4.通过创建交换机,声明队列,绑定关系,路由Key,发送消息,和接收消息
channel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到信息是" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("接受消息失败");
}
}
);
System.out.println("开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7.关闭连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8.关闭通道
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
首先执行生产者producer得main方法:
登录rabbitmq:看到queue1生产了一条队列信息
然后使用消费者消费掉:
看到ready变成0,被成功消费掉
二、fanout
1.复制simple模式的producer,consumer
2.rabbitmq添加交换机,创建队列,绑定关系
3.修改producer
package com.zoomy.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("******");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接Connection
connection = connectionFactory.newConnection("生产者");
// 3.通过连接获取通道Channel
channel = connection.createChannel();
// 4.准备消息内容
String message = "Hello rabbitmq!!!";
//5.准备交换机
String exchange = "fanout-exchange";
// 6.定义routing key
String routekey = "";
// 7.指定交换机类型
String type = "fanout";
//8.发送消息给队列queue
channel.basicPublish(exchange, routekey, null, message.getBytes());
System.out.println("消息发送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 9.关闭连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 10.关闭通道
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
4.修改consumer
package com.zoomy.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static Runnable runnable = new Runnable() {
@Override
public void run() {
// 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("********");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//获取队列名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接Connection
connection = connectionFactory.newConnection("生产者");
// 3.通过连接获取通道Channel
channel = connection.createChannel();
// 4.通过创建交换机,声明队列,绑定关系,路由Key,发送消息,和接收消息
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(queueName + "收到信息是" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("接受消息失败");
}
}
);
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7.关闭连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8.关闭通道
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
//创建三个消费者
public static void main(String[] args) {
new Thread(runnable, "queue1").start();
new Thread(runnable, "queue2").start();
new Thread(runnable, "queue3").start();
}
}
5.测试:
运行producer
队列信息都增加一条
运行consumer,发现队列123都进行了消费
三、direct
1、创建交换机,绑定关系
2.复制fanout的producer,和consumer,给producer做修改
package com.zoomy.rabbitmq.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("*****");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接Connection
connection = connectionFactory.newConnection("生产者");
// 3.通过连接获取通道Channel
channel = connection.createChannel();
// 4.准备消息内容
String message = "Hello rabbitmq!!!";
//5.准备交换机
String exchange = "direct-exchange";
// 6.定义routing key
String routekey = "email";
// 7.指定交换机类型
String type = "direct";
//8.发送消息给队列queue
channel.basicPublish(exchange, routekey, null, message.getBytes());
System.out.println("消息发送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 9.关闭连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 10.关闭通道
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
运行producer
运行consumer,看到routing key是 email的队列1 ,3 进行了消费
四、topic模式
复制direct代码
1、创建交换机,并绑定关系
2.修改producer
package com.zoomy.rabbitmq.topics;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("121.41.211.173");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接Connection
connection = connectionFactory.newConnection("生产者");
// 3.通过连接获取通道Channel
channel = connection.createChannel();
// 4.准备消息内容
String message = "Hello rabbitmq!!!";
//5.准备交换机
String exchange = "topic-exchange";
// 6.定义routing key
String routekey = "com.order.test.xxxx";
// 7.指定交换机类型
String type = "topic";
//8.发送消息给队列queue
channel.basicPublish(exchange, routekey, null, message.getBytes());
System.out.println("消息发送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 9.关闭连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 10.关闭通道
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
3.运行producer
4.运行consumer,队列1 3成功消费
五、上面都是通过图形化界面进行创建交换机,下面是纯代码实现
1.创建producer
package com.zoomy.rabbitmq.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("121.41.211.173");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接Connection
connection = connectionFactory.newConnection("生产者");
// 3.通过连接获取通道Channel
channel = connection.createChannel();
// 4.准备消息内容
String message = "Hello rabbitmq!!!";
//5.准备交换机
String exchange = "direct_message_exchange";
// 6.定义routing key
String routekey = "order";
// 7.指定交换机类型
String type = "direct";
//8.声明交换机,true是 持久化,交换机不会随着服务器重启造成丢失
channel.exchangeDeclare(exchange,type,true);
//9.声明队列
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);
//10.绑定队列和交换机关系
channel.queueBind("queue5",exchange,"order");
channel.queueBind("queue6",exchange,"order");
channel.queueBind("queue7",exchange,"course");
//8.发送消息给队列queue
channel.basicPublish(exchange, routekey, null, message.getBytes());
System.out.println("消息发送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 9.关闭连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 10.关闭通道
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
2.运行consumer
六、工作模式work(轮询分发模式,公共分发模式)
1.procuder
package com.zoomy.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("121.41.211.173");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接Connection
connection = connectionFactory.newConnection("生产者");
// 3.通过连接获取通道Channel
channel = connection.createChannel();
//4.准备发送消息的内容
for (int i = 0; i < 20; i++) {
//消息内容
String msg = "rabbitmq" + i;
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("消息发送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 9.关闭连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 10.关闭通道
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
2.Work1,Work2
package com.zoomy.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work1 {
public static void main(String[] args) {
// 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("121.41.211.173");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//获取队列名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接Connection
connection = connectionFactory.newConnection("消费者-Work1");
// 3.通过连接获取通道Channel
channel = connection.createChannel();
//4定义接受消息的回调
Channel finalChannel = channel;
// finalChannel.basicQos(1);
// 5.通过创建交换机,声明队列,绑定关系,路由Key,发送消息,和接收消息
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("Work1-收到信息是" + new String(message.getBody(), "utf-8"));
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("接受消息失败");
}
}
);
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 6.关闭连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 7.关闭通道
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
package com.zoomy.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work2 {
public static void main(String[] args) {
// 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("121.41.211.173");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//获取队列名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接Connection
connection = connectionFactory.newConnection("消费者-Work2");
// 3.通过连接获取通道Channel
channel = connection.createChannel();
//4定义接受消息的回调
Channel finalChannel = channel;
// finalChannel.basicQos(1);
// 5.通过创建交换机,声明队列,绑定关系,路由Key,发送消息,和接收消息
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("Work2-收到信息是" + new String(message.getBody(), "utf-8"));
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("接受消息失败");
}
}
);
System.out.println("Work2-开始接受消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 6.关闭连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 7.关闭通道
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
3.分别运行work1,work2
4.运行producer
5.看到是轮询



