3.1 Rabbitmq连接
1 用来在给定的参数(l 地址、端口号、用户名、密码等)
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost(virtualHost) ; factory.setHost(IP ADDRESS); factory . setPort(PORT) ; Connection conn = factory.newConnection();
2 完整的url连接
ConnectionFactory factory =口ew ConnectionFactory(); factory.setUri( " amqp:lluserName : password@ipAddress:portNumber/virtualHost"); Connection conn = factory.newConnection(); Connection 接口被用来创建一个 Channel: Channel channel = conn.createChannel();
注意:connection可以创建多个channel,channel会存在并发问题,会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认机制的运行,应用程序应为每个线程开启一个channel
3.2 使用交换器和队列
channel.exchangeDeclare (exchangeName , "direct" , true) ; channel.queueDeclare(queueName , true , false , false , null); channel.queueBind(queueName , exchangeName , routingKey) ;
exchangeDeclare 方法详解:
大部分方法都是基于这个方法,只不过缺省部分参数
//声明交换机 Exchange.DeclareOk exchangeDeclare(String exchange , String type , boolean durable , boolean autoDelete , boolean internal, Maparguments) throws IOException ; //参数详解: exchange :交换机名称 type:交换机类型 durable :是否持久化,持久化指会存盘,重启不会丢失 autoDelete :是否自动删除,指前提是有交换机或队列与之绑定,然后全部解绑,会自动删除 internal:是否内置,如果内置则必须要将消息由交换机路由到交换机 arguments:其他一些结构化参数
queueDeclare方法详解
//声明队列 Queue. DeclareOk queueDeclare (String queue , boolean durable , boolean exclusive , boolean autoDelete , Maparguments) throws IOException //无参数的,也叫匿名绑定 Queue.DeclareOk queueDec1are() throws IOException; //参数详解 queue:队列名称 durable:是否持久化 exclusive:是否排他, 1.排他队列仅对第一次声明的连接可见,同一连接的不同信道可以同时访问 2.其他连接不可声明同名的排他队列 3.不管是否设置持久化,连接断开或者客户端退出,自动删除,适用于一个客户端同时发送读取消息的场景 autoDelete :是否自动删除,消费者所有与这个队列连接解绑,自动删除 arguments:设置队列的其他一些参数;如x-rnessage-ttl x-expires
注意:生产者,消费者都可以通过queueDeclare 声明队列,但是如果消费者已经在一个信道上绑定了队列,则无法声明新的队列,先解绑才能声明
queueBind 方法详解
//交换机队列绑定 Queue.BindOk queueBind(String queue , String exchange , String routingKey, Maparguments) throws IOException; //参数详解 queue:队列名称 exchange:交换机名称 routingKey:队列与交换机绑定的路由键 arguments:结构化参数 //解绑方法 Queue . UnbindOk queueUnbind (String queue , String exchange , String routingKey) throws IOException;
exchangeBind 方法详解
//交换机相互绑定 Exchange.BindOk exchangeBind(String destination,String source, String routingKey, Maparguments) throws IOException; //参数详解 destination:接收的交换器名称 source:发送的交换器名称 routingKey:绑定路由键 arguments:结构化参数
3.3 发送消息
void basicPublish(Stri 口 g exchange , String routingKey, boolean mandatory, boolean immediate , BasicProperties props, byte[] body) throws IOException ; //参数详解 exchangeName:交换机名称 routingKey :路由键 props:一些消息的属性设置 channe1.basicPub1ish(exchangeName , routingKey , new AMQP BasicProperties.Bui1der () .contentType( " text/p1ain" ) //内容类型 .de1iveryMode(2)//投递模式为2,消息会被持久化带磁盘中 .priority (1) //优先级 .userld( " hidden " ) .headers(headers) //消息带headers . expiration( " 60000 " )//消息过期时间 .build()) , messageBodyBytes) ; body:真正的消息 mandatory:详解见4.1 immediate:详解见4.1
3.4 消费消息
消费模式分两种:
推模式:Basic.Consume,可以通过持续订阅的方式来消费消息
拉模式:Basic.Get,可以单条地获取消息
//推模式方法的全部参数 String basicConsume(String queue , boolean autoAck, String consumerTag, boolean noLocal , boolean exclusive , Maparguments, Consumer callback) throws IOException ; //参数详解 queue:队列名称 autoAck:是否自动确认,一般为否 consumerTag:消费者标签,用来区分多个消费者 noLocal:设置为 true 则表示不能将同一个 Connectio口中生产者发送的消息传送给这个 Connection 中的消费者: exclusive :是否排他 arguments:设置消费者的其他参数 callback 设置消费者的回调函数。用来处理 RabbitMq 推送过来的消息,比如 DefaultConsumer 使用时需要客户端重写 (overrie) 其中的方法。 //拉模式方法 GetResponse basicGet(String queue , boolean autoAck) throws IOException; queue :队列名称 autoAck:true为自动接收消息,false为手动接收
推模式和拉模式区别:
推模式通过持续订阅消费消息,但是仍然会受Basic.Qos的限制;拉模式通过获取单条消息,如果通过循环做到和推模式的相同效果,将会非常影响rabbitmq的性能,高吞吐量还是使用推模式
3.5 消费端的确认与拒绝
确认:
只要消费端的自动ack为false,消息将会一直被确认为止才会清除,即使消费端宕机,消息也会重新投递,不会丢失;判断是否重新投递的唯一依据是消费端是否断开
拒绝:
//Channel 类中的 basicReject 方法定义如下:一次移除一个 void basicReject(long deliveryTag, boolean requeue) throws IOException; 参数详解: deliveryTag:消息编号,64位长整型 requeue:为true,重新回到队列,给下一个消费者;false则直接移除 //channel.basicNack方法定义如下:批量移除 void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException; 参数详解: deliveryTag:消息编号,64位长整型 multiple:为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。 requeue:为true,重新回到队列,给下一个消费者;false则直接移除 //Basic.Recover 具备可重入队列的特性, //basicRecover 用来请求 RabbitMQ 重新发送还未被确认的消息 Basic.RecoverOk basicRecover(boolean requeue) throws IOException; requeue:默认为true,为 true ,则未被确认的消息会被重新加入到队列中,这样就可能投递给不同消费者;为false,只会分给原来相同的消费者
3.6 关闭连接
//可以显式关闭,实际上connection关闭,channel也会关闭 channel.close(); conn.close() ;
Connection和 Channel 所具备的生命周期
open:开启状态,对象可用
closing:正在关闭状态
closed:已经关闭
获取Connection和 Channel 关闭的原因:
给这个对象添加一个ShutdownListener ,可以通过的到ShutdownSgnalException ,得到关闭原因;
当 Connection 或者 Channel 的状态转变为 Closed 的时候会调用ShutdownListener 。而且如果将一个 ShutdownListener 注册到一个己经处于 Closed状态的对象(这里特指 Connection Channel 对象)时,会立刻调用 ShutdownListener
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted (ShutdownSgnalException cause)
}
) )



