一、简单的生产者和消费者
生产者:
package com.xiaoxu.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producter {
//定义一个消息队列
static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("39.101.204.74");//设置主机地址
connectionFactory.setPort(5672);//端口号
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2. 创建连接;(抽取一个获取连接的工具类)
Connection connection = connectionFactory.newConnection();
// 3. 创建频道;
Channel channel = connection.createChannel();
//4. 声明队列;
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
// 5. 创建消费者(接收消息并处理消息);
String msg = "我是一个消息。。。。。";
// 6. 监听队列
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println(msg);
//7、关闭资源
channel.close();
connection.close();
}
}
消费者:
package com.xiaoxu.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtis.getConnection();
//创建频道
Channel channel = connection.createChannel();
// create queue
channel.queueDeclare(Producter.QUEUE_NAME,true,false,false,null);
//creat consumer and set information
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException{
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
//listen to the message
channel.basicConsume(Producter.QUEUE_NAME,false,consumer);
}
}
提取的连接工具:
package com.xiaoxu.simple;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtis {
public static Connection getConnection() throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("39.101.204.74");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//创建连接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
三、测试
先开启消费者一直监听消息队列,再开启生产者。



