生产者:
package org.example;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static final String QUEUE_NAME="queue1";
public static void main(String[] args) throws Exception{
// 创建一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
// 工厂ip
factory.setHost("112.124.34.23");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setPort(5672);
// 创建连接
Connection connection=factory.newConnection();
// 获取信道
Channel channel=connection.createChannel();
//AMQChannel(amqp://admin@112.124.16.82:5672/,1)
//System.out.println(channel);
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="HelloWorld";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送完毕");
// 关闭通道和连接
channel.close();
// ***关闭连接***
connection.close();
}
}
消费者:
package org.example;
import com.rabbitmq.client.*;
public class Consumer {
public static final String QUEUE_NAME="queue1";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("112.124.34.23");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection= factory.newConnection();
Channel channel=connection.createChannel();
// 接收和处理消息的回调对象
DeliverCallback deliverCallback=(consumerTag,message)->{
String mes=new String(message.getBody(),"UTF-8");
System.out.println("接收到消息:"+mes);
};
// 消费者取消时的回调对象
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消费被中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
channel.close();
// ***关闭连接***
connection.close();
}
}



