生产者多消费者消费模式
消费通道每次获取一个消息:channel.basicQos(1);
channel.basicConsume(“msg”,false,new DefaultConsumer(channel){
autoAck 关闭自动确认消息
获取消息后手动确认
package wordqueues;
import com.jia.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import java.io.IOException;
public class Provider {
@Test
public void sendMsg() throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("msg",false,false,false,null);
for (int i = 0; i < 200; i++) {
channel.basicPublish("","msg", MessageProperties.PERSISTENT_TEXT_PLAIN,("消息"+i).getBytes());
}
RabbitMQUtils.closeConnectAndChanel(channel,connection);
}
}
消费者
package com.jia.wordqueues;
import com.jia.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
final Channel channel = connection.createChannel();
//通道只取一个消息
channel.basicQos(1);
channel.queueDeclare("msg",false,false,false,null);
channel.basicConsume("msg",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
//手动确认消息标识 multiple 是否确认多个 false 每次确认一个
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
package com.jia.wordqueues;
import com.jia.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = RabbitMQUtils.getConnection();
final Channel channel = connection.createChannel();
//通道只取一个消息
channel.basicQos(1);
//队列声明
channel.queueDeclare("msg",false,false,false,null);
channel.basicConsume("msg",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String(body));
//手动确认消息标识 multiple 是否确认多个 false 每次确认一个
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}



