栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMq入门3

RabbitMq入门3

3.1 Rabbitmq连接
1 用来在给定的参数(l 地址、端口号、用户名、密码等)

ConnectionFactory factory = new ConnectionFactory(); 
factory.setUsername(USERNAME); 
factory.setPassword(PASSWORD); 
factory.setVirtualHost(virtualHost) ; 
factory.setHost(IP ADDRESS); 
factory . setPort(PORT) ; 
Connection conn = factory.newConnection();
2 完整的url连接
ConnectionFactory factory =口ew ConnectionFactory(); 
factory.setUri( " amqp:lluserName : password@ipAddress:portNumber/virtualHost"); 
Connection conn = factory.newConnection(); 
Connection 接口被用来创建一个 Channel:
Channel channel = conn.createChannel();

注意:connection可以创建多个channel,channel会存在并发问题,会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认机制的运行,应用程序应为每个线程开启一个channel

3.2 使用交换器和队列

channel.exchangeDeclare (exchangeName , "direct" , true) ; 
channel.queueDeclare(queueName , true , false , false , null); 
channel.queueBind(queueName , exchangeName , routingKey) ;

exchangeDeclare 方法详解:
大部分方法都是基于这个方法,只不过缺省部分参数

//声明交换机
Exchange.DeclareOk exchangeDeclare(String exchange , 
String type , boolean durable , 
boolean autoDelete , boolean internal, 
Map arguments) throws IOException ;
//参数详解:
exchange :交换机名称
type:交换机类型
durable :是否持久化,持久化指会存盘,重启不会丢失
autoDelete :是否自动删除,指前提是有交换机或队列与之绑定,然后全部解绑,会自动删除
internal:是否内置,如果内置则必须要将消息由交换机路由到交换机
arguments:其他一些结构化参数

queueDeclare方法详解

//声明队列
Queue. DeclareOk queueDeclare (String queue , boolean durable , boolean exclusive , 
boolean autoDelete , Map arguments) throws IOException
//无参数的,也叫匿名绑定
Queue.DeclareOk queueDec1are() throws IOException;
//参数详解
queue:队列名称
durable:是否持久化
exclusive:是否排他,
	1.排他队列仅对第一次声明的连接可见,同一连接的不同信道可以同时访问
	2.其他连接不可声明同名的排他队列
	3.不管是否设置持久化,连接断开或者客户端退出,自动删除,适用于一个客户端同时发送读取消息的场景
autoDelete :是否自动删除,消费者所有与这个队列连接解绑,自动删除
arguments:设置队列的其他一些参数;如x-rnessage-ttl x-expires

注意:生产者,消费者都可以通过queueDeclare 声明队列,但是如果消费者已经在一个信道上绑定了队列,则无法声明新的队列,先解绑才能声明

queueBind 方法详解

//交换机队列绑定
Queue.BindOk queueBind(String queue , String exchange , String routingKey, Map arguments) throws IOException;
//参数详解
queue:队列名称
exchange:交换机名称
routingKey:队列与交换机绑定的路由键
arguments:结构化参数
//解绑方法
Queue . UnbindOk queueUnbind (String queue , String exchange , String routingKey) throws IOException;

exchangeBind 方法详解

//交换机相互绑定
Exchange.BindOk exchangeBind(String destination,String source, String routingKey, Map arguments) throws IOException;
//参数详解
destination:接收的交换器名称
source:发送的交换器名称
routingKey:绑定路由键
arguments:结构化参数

3.3 发送消息

void basicPublish(Stri 口 g exchange , String routingKey, boolean mandatory, boolean immediate , BasicProperties props, byte[] body) throws IOException ;
//参数详解
exchangeName:交换机名称
routingKey :路由键
props:一些消息的属性设置
channe1.basicPub1ish(exchangeName , routingKey , 
new AMQP BasicProperties.Bui1der () 
	.contentType( " text/p1ain" ) //内容类型
	.de1iveryMode(2)//投递模式为2,消息会被持久化带磁盘中
	.priority (1) //优先级
	.userld( " hidden " )
	.headers(headers) //消息带headers
	. expiration( " 60000 " )//消息过期时间
	.build()) , 
	messageBodyBytes) ;
body:真正的消息
mandatory:详解见4.1
immediate:详解见4.1

3.4 消费消息

消费模式分两种:
推模式:Basic.Consume,可以通过持续订阅的方式来消费消息
拉模式:Basic.Get,可以单条地获取消息

//推模式方法的全部参数
String basicConsume(String queue , boolean autoAck, String consumerTag, boolean noLocal , boolean exclusive , Map arguments, Consumer callback) throws IOException ;
//参数详解
queue:队列名称
autoAck:是否自动确认,一般为否
consumerTag:消费者标签,用来区分多个消费者
noLocal:设置为 true 则表示不能将同一个 Connectio口中生产者发送的消息传送给这个 Connection 中的消费者:
exclusive :是否排他
arguments:设置消费者的其他参数
callback 设置消费者的回调函数。用来处理 RabbitMq 推送过来的消息,比如
DefaultConsumer 使用时需要客户端重写 (overrie) 其中的方法。

//拉模式方法
GetResponse basicGet(String queue , boolean autoAck) throws IOException;
queue :队列名称
autoAck:true为自动接收消息,false为手动接收

推模式和拉模式区别:
推模式通过持续订阅消费消息,但是仍然会受Basic.Qos的限制;拉模式通过获取单条消息,如果通过循环做到和推模式的相同效果,将会非常影响rabbitmq的性能,高吞吐量还是使用推模式

3.5 消费端的确认与拒绝
确认:
只要消费端的自动ack为false,消息将会一直被确认为止才会清除,即使消费端宕机,消息也会重新投递,不会丢失;判断是否重新投递的唯一依据是消费端是否断开
拒绝:

//Channel 类中的 basicReject 方法定义如下:一次移除一个
void basicReject(long deliveryTag, boolean requeue) throws IOException;
参数详解:
deliveryTag:消息编号,64位长整型
requeue:为true,重新回到队列,给下一个消费者;false则直接移除

//channel.basicNack方法定义如下:批量移除
void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException;
参数详解:
deliveryTag:消息编号,64位长整型
multiple:为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。
requeue:为true,重新回到队列,给下一个消费者;false则直接移除

//Basic.Recover 具备可重入队列的特性,
//basicRecover  用来请求 RabbitMQ 重新发送还未被确认的消息
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
requeue:默认为true,为 true ,则未被确认的消息会被重新加入到队列中,这样就可能投递给不同消费者;为false,只会分给原来相同的消费者

3.6 关闭连接

//可以显式关闭,实际上connection关闭,channel也会关闭
channel.close(); 
conn.close() ;

Connection和 Channel 所具备的生命周期
open:开启状态,对象可用
closing:正在关闭状态
closed:已经关闭

获取Connection和 Channel 关闭的原因:
给这个对象添加一个ShutdownListener ,可以通过的到ShutdownSgnalException ,得到关闭原因;
当 Connection 或者 Channel 的状态转变为 Closed 的时候会调用ShutdownListener 。而且如果将一个 ShutdownListener 注册到一个己经处于 Closed状态的对象(这里特指 Connection Channel 对象)时,会立刻调用 ShutdownListener

connection.addShutdownListener(new ShutdownListener() { 
public void shutdownCompleted (ShutdownSgnalException cause)
} 
) ) 
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/312674.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号