一、基于Docker安装
# 1. 安装: 后台及前端页面管理
docker run -it --rm -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
# 2. 管理页面
username:guest
passname:guest
# 3. 依赖管理
implementation group: 'com.rabbitmq', name: 'amqp-client', version: '5.14.0'
二、基本使用
- 一个生产者,多个消费者
- 一个消息只会被消费一次
- 多个消费者之间:轮询消费
1. 工具类
package com.day.dreamer.queue;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static Connection getConnection() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("60.205.229.31");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = null;
try {
return factory.newConnection();
} catch (Exception e) {
throw new RuntimeException();
}
}
}
2. producer
package com.day.dreamer.queue;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import static com.day.dreamer.queue.RabbitMqUtils.getConnection;
public class Producer {
private static final String QUEUE_NAME = "erick";
public static void main(String[] args) throws IOException {
Connection connection = getConnection();
Channel channel = connection.createChannel();
sendMessage(channel);
}
public static void sendMessage(Channel channel) {
try {
AMQP.Queue.DeclareOk queue = channel.queueDeclare(QUEUE_NAME, false, false, true, null);
channel.basicPublish("", QUEUE_NAME, null, "hello".getBytes());
} catch (IOException e) {
throw new RuntimeException();
}
}
}
3. consumer
package com.day.dreamer.queue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import static com.day.dreamer.queue.RabbitMqUtils.getConnection;
public class Consumer {
private static final String QUEUE_NAME = "erick";
public static void main(String[] args) throws IOException {
Connection connection = getConnection();
Channel channel = connection.createChannel();
try {
channel.basicConsume(QUEUE_NAME, true,
(s, delivery) -> System.out.println("consumer success:" + new String(delivery.getBody())),
s -> System.out.println("consumer cancel:" + s));
} catch (IOException e) {
e.printStackTrace();
}
}
}
4. 应答
- 自动应答:消费者接受消息后,不管是否处理成功,消息立即删除
- 手动应答:消费者接受并处理完成后,手动应到,消息才会删除,可以设置消息重新入队列
5. 持久化
- 消息会保存在message broker里面,重新启动后,会将消息从数据库恢复到队列中
- 队列持久化及消息持久化