栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ工具类

RabbitMQ工具类

API 连接相关

获取通道

getChannel()

关闭连接

close(通道)
生产消费相关

发送简单消息

easySend(通道, 队列名, 数据字符串)

交换机发送消息

exchangeSend(通道, 交换机名, 路由键, 消息字符串)

竞争消费消息

easyReceive(通道, 队列名,传递回调, 取消回调)

三种消费应答(确认,拒绝,丢弃)

receiveOK(通道, Delivery)
receiveReturn(通道, Delivery)
receiveDestroy(通道, Delivery)
结构生成相关

创建队列

createQuere(通道, 队列名, 是否持久)

创建指定死信的队列

createQuere(通道, 队列名, 是否持久, 超时秒数, 死信交换机名, 死信路由键)

创建交换机

createExchange(通道, 交换机名, 交换机类型, 是否持久)

绑定交换机与队列

bingExchange(通道, 交换机名, 队列名, 路由键)
完整代码
public class RabbitUtil {
	//获取连接
	public static Channel getChannel() throws IOException, TimeoutException {
		ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("localhost");
		cf.setUsername("guest");
		cf.setPassword("guest");
		return cf.newConnection().createChannel();
	}

	//关闭连接(通道)
	public static void close(Channel channel) throws IOException {
		channel.getConnection().close();
	}

	//发送简单消息(通道,队列名,消息字符串)
	public static void easySend(Channel channel, String queueName, String message)
			throws IOException {
		
		channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
	}

	//交换机发送(通道,交换机名,路由键,消息字符串)
	public static void exchangeSend(Channel channel, String exchangeName, String key, String message) throws IOException {
		
		channel.basicPublish(exchangeName, key, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
	}

	//竞争消费消息(通道,队列名,传递回调,取消回调)
	public static void easyReceive(Channel channel, String queueName, DeliverCallback deliver, CancelCallback cancel)
			throws IOException {
		
		channel.basicQos(1);
		channel.basicConsume(queueName, false, deliver, cancel);
	}

	//三种消费应答(确认,拒绝,丢弃)
	public static void receiveOK(Channel channel, Delivery message) throws IOException {
		channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
	}

	public static void receiveReturn(Channel channel, Delivery message) throws IOException {
		channel.basicReject(message.getEnvelope().getDeliveryTag(), true);
	}

	public static void receiveDestroy(Channel channel, Delivery message) throws IOException {
		channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
	}

	//创建队列(通道,队列名,是否持久)
	public static void createQuere(Channel channel, String queueName, Boolean durable) throws IOException {
		
		channel.queueDeclare(queueName, durable, false, false, null);
	}

	//创建指定死信的队列(通道,队列名,是否持久,超时秒数,死信交换机名,死信路由键)
	public static void createQuere(Channel channel, String queueName, Boolean durable, Integer second, String exchangeName, String key) throws IOException {

		HashMap arguments = new HashMap<>();
		//过期时间
		arguments.put("x-message-ttl", second * 1000);
		//最大长度
		//arguments.put("x-max-length", length);
		//目标交换机
		arguments.put("x-dead-letter-exchange", exchangeName);
		//目标路由键
		arguments.put("x-dead-letter-routing-key", key);
		channel.queueDeclare(queueName, durable, false, false, arguments);
	}

	//创建交换机(通道,交换机名,交换机类型,是否持久)
	public static void createExchange(Channel channel, String name, String type, Boolean durable) throws IOException, TimeoutException {
		channel.exchangeDeclare(name, type, durable);
	}

	//绑定交换机与队列
	public static void bingExchange(Channel channel, String exchangeName, String queueName, String key) throws IOException, TimeoutException {
		channel.queueBind(queueName, exchangeName, key);
	}
}
示例代码

结构生成示例:

public static void main(String[] args) throws IOException, TimeoutException {
	Channel c = getChannel();
	//创建一个队列
	createQuere(c, "q1", true);
	close(c);
}

生产者示例:

public static void main(String[] args) throws Exception {
	//获取连接
	Channel channel = RabbitUtil.getChannel();
	//发布确认
	channel./confirm/iSelect();
	/confirm/iCallback ok = (deliveryTag, multiple) -> {
		System.out.println("成功:" + deliveryTag);
	};
	/confirm/iCallback no = (deliveryTag, multiple) -> {
		System.out.println("失败:" + deliveryTag);
	};
	channel.add/confirm/iListener(ok, no);
	//发送消息
	RabbitUtil.easySend(channel, "q1", "你好,沃德!");
}

消费者示例:

public static void main(String[] args) throws Exception {
	//获取连接
	Channel channel = RabbitUtil.getChannel();
	//两个回调
	DeliverCallback deliver = (consumerTag, message) -> {
		System.out.print("消费成功:" + new String(message.getBody()));
		RabbitUtil.receiveOK(channel, message);
	};
	CancelCallback cancel = consumerTag -> {
		System.out.println("消息被取消了");
	};
	//开始消费
	RabbitUtil.easyReceive(channel, "q1", deliver, cancel);
}

示例运行结果:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/698404.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号