- 抽取工具类
package utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMqUtils {
//得到一个连接的channel
public static Channel getChannel() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂ip 链接mQ队列
factory.setHost("124.220.3.19");
//用户名
factory.setUsername("root");
//密码
factory.setPassword("root");
//创建链接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
return channel;
}
}
消费者代码
package two;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Worker01 {
//队列的名称
public static final String QUEUE_NAME = "hello";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//消息的接收
//声明_接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息" + new String(message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
};
System.out.println("C2_等待接收消息......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
生产者代码
package two;
import com.rabbitmq.client.Channel;
import utils.RabbitMqUtils;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Task01 {
//队列的名称
public static final String QUEUE_NAME = "hello";
//发送大量的消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//从控制台中接收信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成:" + message);
}
}
}
运行结果
-
生产者发送消息
-
消费者1
-
消费者2
- 睡眠工具类
package utils;
public class SleepUtils {
public static void sleep(int second) {
try {
Thread.sleep(1000 * second);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- 生产者类
package three;
import com.rabbitmq.client.Channel;
import utils.RabbitMqUtils;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Task02 {
//队列的名称
public static final String TASK_QUEUE_NAME = "ack_queue";
//发送大量的消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
//从控制台中接收信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("发送消息完成:" + message);
}
}
}
- 消费者1
package three;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;
import utils.SleepUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Worker03 {
//队列的名称
public static final String TASK_QUEUE_NAME = "ack_queue";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1_等待接收消息_时间较短......");
//消息的接收
//声明_接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
SleepUtils.sleep(1);
System.out.println("接收到的消息" + new String(message.getBody()));
//手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
- 消费者2
package three;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;
import utils.SleepUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Worker04 {
//队列的名称
public static final String TASK_QUEUE_NAME = "ack_queue";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2_等待接收消息_时间较长......");
//消息的接收
//声明_接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
SleepUtils.sleep(30);
System.out.println("接收到的消息" + new String(message.getBody()));
//手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
- 运行结果



