spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
事务
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
channel.txSelect()声明启动事务模式;
channel.txComment()提交事务;
channel.txRollback()回滚事务;
package com.yzm.rabbitmq_10.config;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean(name = "channel")
public Channel channel(ConnectionFactory connectionFactory) {
// true,启动事务;
return connectionFactory.createConnection().createChannel(true);
}
}
package com.yzm.rabbitmq_10.sender;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iListener;
import com.rabbitmq.client.MessageProperties;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
@RestController
public class Sender {
@Resource(name = "channel")
private Channel channel;
@GetMapping("/tx")
public void tx(int normal) throws IOException {
String message = "Hello world!";
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
try {
// 声明事务
channel.txSelect();
// 发送消息
channel.basicPublish("tx.exchange", "tx.yzm", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
int i = 1;
if (normal == 1) i = 1 / 0;
log.info("提交事务");
channel.txCommit();
} catch (Exception e) {
log.info("事务回滚");
channel.txRollback();
}
}
}
package com.yzm.rabbitmq_10.receiver;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "tx_queue"),
exchange = @Exchange(value = "tx.exchange", type = ExchangeTypes.DIRECT),
key = {"tx.yzm"}
))
public void receiveC(Message message) {
System.out.println(" [ 消费者@tx号 ] Received ==> '" + new String(message.getBody()) + "'");
}
}
无异常
/confirm/i确认模式制造异常
消息回滚,消费者没有消息可消费
/confirm/i发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。
/confirm/i的三种实现方式:
channel.waitForConfirms()普通发送方确认模式;
channel.waitForConfirmsOrDie()批量确认模式;
channel.addConfirmListener()异步监听发送方确认模式;
事务跟/confirm/i不能同时使用,关闭事务
@Bean(name = "channel")
public Channel channel(ConnectionFactory connectionFactory) {
// true,启动事务;
return connectionFactory.createConnection().createChannel(false);
}
package com.yzm.rabbitmq_10.sender;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iListener;
import com.rabbitmq.client.MessageProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
@Slf4j
@RestController
public class Sender {
@Resource(name = "channel")
private Channel channel;
@GetMapping("/tx")
public void tx(int normal) throws IOException {
String message = "Hello world!";
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
try {
// 声明事务
channel.txSelect();
// 发送消息
channel.basicPublish("tx.exchange", "tx.yzm", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
int i = 1;
if (normal == 1) i = 1 / 0;
log.info("提交事务");
channel.txCommit();
} catch (Exception e) {
log.info("事务回滚");
channel.txRollback();
}
}
@GetMapping("//confirm/i")
public void /confirm/i() {
try {
String message = "Hello world!";
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
// 开启/confirm/i确认模式
channel./confirm/iSelect();
// 发送消息
channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 等待消息被确认
if (channel.waitFor/confirm/is()) {
System.out.println("生产者确认消费者收到消息");
} else {
// 返回false可以进行补发。
System.out.println("消费者未收到消息,是否重发");
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// channel.waitForConfirms 可能返回超时异常
// 可以进行补发。
}
}
@GetMapping("//confirm/i2")
public void /confirm/i2() {
try {
StringBuilder message = new StringBuilder("Hello world!");
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
// 开启/confirm/i确认模式
channel./confirm/iSelect();
// 发送消息
for (int i = 1; i <= 5; i++) {
message.append("_").append(i);
channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes());
}
// 阻塞线程,等待消息被确认。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。
channel.waitFor/confirm/isOrDie();
System.out.println("只有确认消费者收到消息了才会打印此日志");
} catch (InterruptedException e) {
// 可以进行补发。
} catch (IOException e) {
//
}
}
@GetMapping("//confirm/i3")
public void /confirm/i3() {
try {
StringBuilder message = new StringBuilder("Hello world!");
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
// 开启/confirm/i确认模式
channel./confirm/iSelect();
// 发送消息
for (int i = 1; i <= 5; i++) {
message.append("_").append(i);
channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes());
}
//异步监听确认和未确认的消息
channel.add/confirm/iListener(new /confirm/iListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("已确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple);
}
});
} catch (IOException e) {
//
}
}
}
package com.yzm.rabbitmq_10.receiver;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "tx_queue"),
exchange = @Exchange(value = "tx.exchange", type = ExchangeTypes.DIRECT),
key = {"tx.yzm"}
))
public void receiveC(Message message) {
System.out.println(" [ 消费者@tx号 ] Received ==> '" + new String(message.getBody()) + "'");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "/confirm/i_queue"),
exchange = @Exchange(value = "/confirm/i.exchange", type = ExchangeTypes.FANOUT)
))
public void receiveD(Message message) {
System.out.println(" [ 消费者@cf号 ] Received ==> '" + new String(message.getBody()) + "'");
}
}
普通/confirm/i模式
http://localhost:8080//confirm/i
运行结果:
生产者生产消息,发送到消费者,消费者确认后,生产者收到确认
批量/confirm/i模式
http://localhost:8080//confirm/i2
运行结果:
相关链接异步/confirm/i模式
http://localhost:8080//confirm/i3
运行结果:
首页
上一篇:Channel
下一篇:集群搭建管理



