代码运行需先安装环境依赖
windows环境下:
先安装Erlang 页面:https://www.erlang.org/downloads 下载链接:https://erlang.org/download/otp_win64_24.1.exe 安装完配置环境变量 ERLANG_HOME C:Program Fileserl-24.1 然后在path变量里添加 %ERLANG_HOME%/bin 然后打开cmd命令框,输入erl Eshell V12.1 (abort with ^G) Erlang安装完成 开始安装rabbitmq windows环境下 页面:https://www.rabbitmq.com/install-windows.html 下载链接:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.7/rabbitmq-server-3.9.7.exe
client端
package com.liang.demo.RabbitMQ;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Client {
public static void main(String[] args) throws Exception {
//连接工厂
ConnectionFactory f = new ConnectionFactory();
f.setHost("ip地址");
f.setUsername("admin");
f.setPassword("admin");
//建立连接
Connection c = f.newConnection();
//建立信道
Channel ch = c.createChannel();
//声明队列,如果该队列已经创建过,则不会重复创建
ch.queueDeclare("task_queue", true, false, false, null);
System.out.println("等待接收数据");
//收到消息后用来处理消息的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: " + msg);
//遍历字符串中的字符,每个点使进程暂停一秒
for (int i = 0; i < msg.length(); i++) {
if (msg.charAt(i) == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
System.out.println("处理结束");
//参数1:消息标签,参数2:是否确认多条消息
ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
//消费者取消时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//一次只能接受一条数据
ch.basicQos(1);
//第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功
ch.basicConsume("task_queue", false, callback, cancel);
}
}
server端
package com.liang.demo.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
public class Server {
public static void main(String[] args) throws Exception {
//创建连接工厂,并设置连接信息
ConnectionFactory f = new ConnectionFactory();
f.setHost("ip地址");
f.setPort(5672);//可选,5672是默认端口
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
//建立信道
Channel ch = c.createChannel();
ch.queueDeclare("task_queue", true, false, false, null);
// ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
// System.out.println("消息已发送");
while (true) {
//控制台输入的消息发送到rabbitmq
System.out.print("输入消息: ");
String msg = new Scanner(System.in).nextLine();
//如果输入的是"exit"则结束生产者进程
if ("exit".equals(msg)) {
break;
}
//参数:exchage,routingKey,props,body;
//MessageProperties.PERSISTENT_TEXT_PLAIN持久化的设置
ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
System.out.println("消息已发送: " + msg);
}
c.close();
}
}
pom添加依赖:
com.rabbitmq
amqp-client
5.4.3



