jdk1.8构建一个maven工程导入rabbitmq的maven依赖启动rabbitmq-server服务定义生产者定义消费者观察消息的在rabbitmq-server服务中的过程 2、构建一个maven工程
3、导入rabbitmq依赖 1)Java原生依赖2)spring依赖com.rabbitmq amqp-client 5.14.2
3)springboot依赖org.springframework.amqp spring-amqp 2.4.2 org.springframework.amqp spring-rabbit 2.4.2
org.springframework.boot spring-boot-starter-amqp 2.6.4
4、启动rabbitmq-server服务可以根据自已的项目环境选择上面三种依赖形式
systemctl start rabbitmq-server # 或者 docker启动 docker start myrabbit5、定义生产者
package com.tuwer.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.19.101");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2、创建连接
Connection connection = null;
// 3、获取通道
Channel channel = null;
try {
connection = connectionFactory.newConnection("生产者");
channel = connection.createChannel();
// 4、通过通道声明队列queue存储消息
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
// 5、准备消息
String message = "Hello World! " + LocalDateTime.now();
// 6、发送消息
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
6、定义消费者
package com.tuwer.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.19.101");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2、创建连接
Connection connection = null;
// 3、获取通道
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者");
channel = connection.createChannel();
// 4、通过通道接收消息
String queueName = "queue1";
channel.basicConsume(
queueName,
true,
new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到的消息:" + new String(message.getBody()));
}
},
new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失败!");
}
}
);
// 循环读取
System.out.println("开始接收消息....");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 5、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 6、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
lambada表达式简便写法
// 省略...
// 4、通过通道接收消息
String queueName = "queue1";
// 接收处理
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("收到的消息:" + new String(message.getBody()));
};
// 接收失败处理
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息接收失败!");
};
channel.basicConsume(
queueName,
true,
deliverCallback,
cancelCallback
);
// 省略...



