1 github: 2 rabbitmq04使用Channel创建队列、交换机,发送消息以及消费消息
4.0.0 com.yzm rabbitmq 0.0.1-SNAPSHOT ../pom.xml rabbitmq04 0.0.1-SNAPSHOT jar rabbitmq04 Demo project for Spring Boot org.springframework.boot spring-boot-maven-plugin
项目结构
application.yml
不配置
package com.yzm.rabbitmq04.config;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.stereotype.Component;
@Component
public class RabbitConfig {
public static final String QUEUE = "queue-a";
// 获取RabbitMQ服务器连接
public static Connection getConnection() {
Connection connection = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
connection = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return connection;
}
}
生产者
package com.yzm.rabbitmq04.service;
import com.rabbitmq.client.*;
import com.yzm.rabbitmq04.config.RabbitConfig;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@Component
public class SenderService {
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void producerA() throws IOException, TimeoutException {
//1、获取连接
Connection connection = RabbitConfig.getConnection();
//2、创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConfig.QUEUE, true, false, false, null);
for (int i = 1; i <= 10; i++) {
String message = "Hello World!...... " + i;
System.out.println(" [ Sent ] 消息内容 " + message);
channel.basicPublish("", RabbitConfig.QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
}
// 5、释放资源
channel.close();
connection.close();
}
}
消费者,自动确认消息
package com.yzm.rabbitmq04.service;
import com.rabbitmq.client.*;
import com.yzm.rabbitmq04.config.RabbitConfig;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Component
public class ReceiverService {
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
public void consumerA() throws IOException {
// 1、获取连接
Connection connection = RabbitConfig.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
channel.basicConsume(RabbitConfig.QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(" [ received@A_1 ] 消息内容 : " + new String(body, StandardCharsets.UTF_8) + "!");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
运行结果:
修改消费者,手动确认消息
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
public void consumerA_2() throws IOException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
//取消自动ack
channel.basicConsume(RabbitConfig.QUEUE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(" [ received@A_2 ] 消息内容 : " + new String(body, StandardCharsets.UTF_8) + "!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//消息确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
4 fanout 交换机注释
运行结果:
public static final String FANOUT_EXCHANGE = "fanout-exchange-a";
public static final String FANOUT_QUEUE_A = "fanout-queue-a";
public static final String FANOUT_QUEUE_B = "fanout-queue-b";
生产者
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void producerB() throws IOException, TimeoutException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(RabbitConfig.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);
// 声明队列
channel.queueDeclare(RabbitConfig.FANOUT_QUEUE_A, true, false, false, null);
channel.queueDeclare(RabbitConfig.FANOUT_QUEUE_B, true, false, false, null);
// 队列绑定交换机,不需要路由键,用空字符串表示
channel.queueBind(RabbitConfig.FANOUT_QUEUE_A, RabbitConfig.FANOUT_EXCHANGE, "");
channel.queueBind(RabbitConfig.FANOUT_QUEUE_B, RabbitConfig.FANOUT_EXCHANGE, "");
for (int i = 1; i <= 10; i++) {
String message = "Hello World!...... " + i;
System.out.println(" [ Sent ] 消息内容 " + message);
channel.basicPublish(RabbitConfig.FANOUT_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
}
channel.close();
connection.close();
}
消费者B_1、B_2
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
public void consumerB_1() throws IOException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.basicConsume(RabbitConfig.FANOUT_QUEUE_A, true, getConsumer(channel, " [ received@B_1 ] 消息内容 : ", 500));
}
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
public void consumerB_2() throws IOException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.basicConsume(RabbitConfig.FANOUT_QUEUE_B, true, getConsumer(channel, " [ received@B_2 ] 消息内容 : ", 1000));
}
private DefaultConsumer getConsumer(Channel channel, String s, int i) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(s + new String(body, StandardCharsets.UTF_8) + "!");
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
5 direct 交换机运行结果:
public static final String DIRECT_EXCHANGE = "direct-exchange-a";
public static final String DIRECT_QUEUE_A = "direct-queue-a";
public static final String DIRECT_QUEUE_B = "direct-queue-b";
生产者
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void producerC() throws IOException, TimeoutException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(RabbitConfig.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
channel.queueDeclare(RabbitConfig.DIRECT_QUEUE_A, true, false, false, null);
channel.queueDeclare(RabbitConfig.DIRECT_QUEUE_B, true, false, false, null);
channel.queueBind(RabbitConfig.DIRECT_QUEUE_A, RabbitConfig.DIRECT_EXCHANGE, "direct.a");
channel.queueBind(RabbitConfig.DIRECT_QUEUE_B, RabbitConfig.DIRECT_EXCHANGE, "direct.a.b");
for (int i = 1; i <= 10; i++) {
String message = "Hello World! " + i;
if (i % 2 == 0) {
channel.basicPublish(RabbitConfig.DIRECT_EXCHANGE, "direct.a", null, message.getBytes());
} else {
channel.basicPublish(RabbitConfig.DIRECT_EXCHANGE, "direct.a.b", null, message.getBytes());
}
System.out.println(" [ Sent ] 消息内容 " + message);
}
channel.close();
connection.close();
}
消费者C_1、C_2
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
public void consumerC_1() throws IOException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.basicConsume(RabbitConfig.DIRECT_QUEUE_A, true, getConsumer(channel, " [ received@C_1 ] 消息内容 : ", 500));
}
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
public void consumerC_2() throws IOException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.basicConsume(RabbitConfig.DIRECT_QUEUE_B, true, getConsumer(channel, " [ received@C_2 ] 消息内容 : ", 1000));
}
private DefaultConsumer getConsumer(Channel channel, String s, int i) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(s + new String(body, StandardCharsets.UTF_8) + "!");
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
6 topic 交换机运行结果:
public static final String TOPIC_EXCHANGE = "topic-exchange-a";
public static final String TOPIC_QUEUE_A = "topic-queue-a";
public static final String TOPIC_QUEUE_B = "topic-queue-b";
public static final String TOPIC_QUEUE_C = "topic-queue-c";
生产消息
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void producerD() throws IOException, TimeoutException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(RabbitConfig.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);
channel.queueDeclare(RabbitConfig.TOPIC_QUEUE_A, true, false, false, null);
channel.queueDeclare(RabbitConfig.TOPIC_QUEUE_B, true, false, false, null);
channel.queueDeclare(RabbitConfig.TOPIC_QUEUE_C, true, false, false, null);
channel.queueBind(RabbitConfig.TOPIC_QUEUE_A, RabbitConfig.TOPIC_EXCHANGE, "topic.*.a");
channel.queueBind(RabbitConfig.TOPIC_QUEUE_B, RabbitConfig.TOPIC_EXCHANGE, "topic.#");
channel.queueBind(RabbitConfig.TOPIC_QUEUE_C, RabbitConfig.TOPIC_EXCHANGE, "topic.a.*");
for (int i = 1; i <= 30; i++) {
String message = "Hello World! " + i;
if (i % 3 == 0) {
channel.basicPublish(RabbitConfig.TOPIC_EXCHANGE, "topic.a.a", null, message.getBytes());
} else if (i % 3 == 1) {
channel.basicPublish(RabbitConfig.TOPIC_EXCHANGE, "topic.b.b", null, message.getBytes());
} else {
channel.basicPublish(RabbitConfig.TOPIC_EXCHANGE, "topic.a.c", null, message.getBytes());
}
System.out.println(" [ Sent ] 消息内容 " + message);
}
channel.close();
connection.close();
}
消费消息
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
public void consumerD_1() throws IOException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.basicConsume(RabbitConfig.TOPIC_QUEUE_A, true, consumer(channel, " [ received@D_1 ] 消息内容 : "));
}
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
public void consumerD_2() throws IOException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.basicConsume(RabbitConfig.TOPIC_QUEUE_B, true, consumer(channel, " [ received@D_2 ] 消息内容 : "));
}
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
public void consumerD_3() throws IOException {
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.basicConsume(RabbitConfig.TOPIC_QUEUE_C, true, consumer(channel, " [ received@D_3 ] 消息内容 : "));
}
private DefaultConsumer consumer(Channel channel, String s) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(s + new String(body, StandardCharsets.UTF_8) + "!");
}
};
}
分析
运行结果:



