Vert.x RabbitMQ客户端,允许应用程序与 RabbitMQ 代理交互的 Vert.x 客户端 (AMQP 0.9.1)
1. maven项目依赖2.YAML文件配置io.vertx vertx-webio.vertx vertx-rabbitmq-clientio.vertx vertx-config-yamlcom.fasterxml.jackson.core jackson-databindcom.lance.common vertx-common-core0.0.1-SNAPSHOT
server: host: 127.0.0.1 port: 18005 rabbit: host: 127.0.0.1 port: 18003 username: root password: root virtualHost: /root connectionTimeout: 6000 requestedHeartbeat: 60 handshakeTimeout: 6000 requestedChannelMax: 5 networkRecoveryInterval: 500 automaticRecoveryEnabled: true3.启动加载配置文件, 并放入到config中去
public class RabbitmqApplication {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
ConfigRetriever retriever = readYaml(vertx);
retriever.getConfig(json -> {
JsonObject object = json.result();
ClientHelper dbHelper = new ClientHelper(object.getJsonObject("rabbit"), vertx);
dbHelper.afterPropertiesSet();
DeploymentOptions options = new DeploymentOptions().setConfig(object);
vertx.deployVerticle(MainApp.class.getName(), options);
});
}
private static ConfigRetriever readYaml(Vertx vertx) {
ConfigStoreOptions store = new ConfigStoreOptions()
.setType("file")
.setFormat("yaml")
.setOptional(true)
.setConfig(new JsonObject().put("path", "application.yaml"));
return ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(store));
}
}
4.RabbitMQ client连接配置
public class ClientHelper {
private final JsonObject object;
private final Vertx vertx;
@Getter
private static RabbitMQClient client;
public void afterPropertiesSet() {
ConfigProperties.MqProperties prop = object.mapTo(ConfigProperties.MqProperties.class);
RabbitMQOptions config = new RabbitMQOptions();
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser(prop.getUsername());
config.setPassword(prop.getPassword());
config.setHost(prop.getHost());
config.setPort(prop.getPort());
config.setVirtualHost(prop.getVirtualHost());
config.setReconnectAttempts(prop.getReconnectAttempts());
config.setReconnectInterval(prop.getReconnectInterval());
config.setConnectionTimeout(prop.getConnectionTimeout());
config.setRequestedHeartbeat(prop.getRequestedHeartbeat());
config.setHandshakeTimeout(prop.getHandshakeTimeout());
config.setRequestedChannelMax(prop.getRequestedChannelMax());
config.setNetworkRecoveryInterval(prop.getNetworkRecoveryInterval());
config.setAutomaticRecoveryEnabled(prop.isAutomaticRecoveryEnabled());
client = RabbitMQClient.create(vertx, config);
// Connect
client.start(asyncResult -> {
if (asyncResult.succeeded()) {
consumer();
log.warn("RabbitMQ successfully connected!");
} else {
log.error("Fail to connect to RabbitMQ {}", asyncResult.cause().getMessage());
}
});
}
private void consumer() {
client.basicConsumer(MqConst.HELLO_ROUTING_KEY, result -> {
if (result.succeeded()) {
RabbitMQConsumer mqConsumer = result.result();
mqConsumer.handler(message -> {
log.info("Got message: {}", message.body().toString());
log.info("Message[exchange: {}, routeKey: {}] receive success!", message.envelope().getExchange(), message.envelope().getRoutingKey());
});
} else {
log.error("===>Queue receive fail: ", result.cause());
}
});
}
}
5.Email发送执行实例
public class UserService {
public void sendMessage(RoutingContext ctx) {
UserVo userVo = ctx.getBodyAsJson().mapTo(UserVo.class);
Buffer message = Buffer.buffer(Json.encode(userVo));
ClientHelper.getClient().basicPublish(MqConst.HELLO_EXCHANGE, MqConst.HELLO_ROUTING_KEY, message, result -> {
if (result.succeeded()) {
log.info("Message[exchange: {}, routeKey: {}] published success!", MqConst.HELLO_EXCHANGE, MqConst.HELLO_ROUTING_KEY);
} else {
log.error("Message send fail: ", result.cause());
}
});
ctx.json(R.success("success"));
}
}
8.日志
2022-02-13 14:43:13.826 INFO 24 --- [ntloop-thread-1] com.lance.rabbit.service.UserService ---[ 30] : Message[exchange: hello_exchange, routeKey: hello_route_key] published success!
2022-02-13 14:43:13.893 INFO 19 --- [ntloop-thread-0] com.lance.rabbit.config.ClientHelper ---[ 67] : Got message: {"name":"Jim Green","title":"book"}
2022-02-13 14:43:13.893 INFO 19 --- [ntloop-thread-0] com.lance.rabbit.config.ClientHelper ---[ 68] : Message[exchange: hello_exchange, routeKey: hello_route_key] receive success!
7.项目完整地址
Vertx之Rabbit MQ消息发送 Github 地址
Vertx之Rabbit MQ消息发送 Gitee 地址



