栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Vertx之Rabbit MQ消息发送

Vertx之Rabbit MQ消息发送

介绍

Vert.x RabbitMQ客户端,允许应用程序与 RabbitMQ 代理交互的 Vert.x 客户端 (AMQP 0.9.1)

1. maven项目依赖

	
		io.vertx
		vertx-web
	
	
		io.vertx
		vertx-rabbitmq-client
	
	
		io.vertx
		vertx-config-yaml
	
	
		com.fasterxml.jackson.core
		jackson-databind
	
	
		com.lance.common
		vertx-common-core
		0.0.1-SNAPSHOT
	

2.YAML文件配置
server:
  host: 127.0.0.1
  port: 18005

rabbit:
  host: 127.0.0.1
  port: 18003
  username: root
  password: root
  virtualHost: /root
  connectionTimeout: 6000
  requestedHeartbeat: 60
  handshakeTimeout: 6000
  requestedChannelMax: 5
  networkRecoveryInterval: 500
  automaticRecoveryEnabled: true
3.启动加载配置文件, 并放入到config中去
public class RabbitmqApplication {

  public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    ConfigRetriever retriever = readYaml(vertx);

    retriever.getConfig(json -> {
      JsonObject object = json.result();
      ClientHelper dbHelper = new ClientHelper(object.getJsonObject("rabbit"), vertx);
      dbHelper.afterPropertiesSet();

      DeploymentOptions options = new DeploymentOptions().setConfig(object);
      vertx.deployVerticle(MainApp.class.getName(), options);
    });
  }

  private static ConfigRetriever readYaml(Vertx vertx) {
    ConfigStoreOptions store = new ConfigStoreOptions()
        .setType("file")
        .setFormat("yaml")
        .setOptional(true)
        .setConfig(new JsonObject().put("path", "application.yaml"));

    return ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(store));
  }
}
4.RabbitMQ client连接配置
public class ClientHelper {
  private final JsonObject object;
  private final Vertx vertx;
  @Getter
  private static RabbitMQClient client;

  
  public void afterPropertiesSet() {
    ConfigProperties.MqProperties prop = object.mapTo(ConfigProperties.MqProperties.class);

    RabbitMQOptions config = new RabbitMQOptions();
    // Each parameter is optional
    // The default parameter with be used if the parameter is not set
    config.setUser(prop.getUsername());
    config.setPassword(prop.getPassword());
    config.setHost(prop.getHost());
    config.setPort(prop.getPort());
    config.setVirtualHost(prop.getVirtualHost());
    config.setReconnectAttempts(prop.getReconnectAttempts());
    config.setReconnectInterval(prop.getReconnectInterval());
    config.setConnectionTimeout(prop.getConnectionTimeout());
    config.setRequestedHeartbeat(prop.getRequestedHeartbeat());
    config.setHandshakeTimeout(prop.getHandshakeTimeout());
    config.setRequestedChannelMax(prop.getRequestedChannelMax());
    config.setNetworkRecoveryInterval(prop.getNetworkRecoveryInterval());
    config.setAutomaticRecoveryEnabled(prop.isAutomaticRecoveryEnabled());
    client = RabbitMQClient.create(vertx, config);

    // Connect
    client.start(asyncResult -> {
      if (asyncResult.succeeded()) {
        consumer();
        log.warn("RabbitMQ successfully connected!");
      } else {
        log.error("Fail to connect to RabbitMQ {}", asyncResult.cause().getMessage());
      }
    });
  }

  private void consumer() {
    client.basicConsumer(MqConst.HELLO_ROUTING_KEY, result -> {
      if (result.succeeded()) {
        RabbitMQConsumer mqConsumer = result.result();
        mqConsumer.handler(message -> {
          log.info("Got message: {}", message.body().toString());
          log.info("Message[exchange: {}, routeKey: {}] receive success!", message.envelope().getExchange(), message.envelope().getRoutingKey());
        });
      } else {
        log.error("===>Queue receive fail: ", result.cause());
      }
    });
  }
}
5.Email发送执行实例
public class UserService {

  
  public void sendMessage(RoutingContext ctx) {
    UserVo userVo = ctx.getBodyAsJson().mapTo(UserVo.class);

    Buffer message = Buffer.buffer(Json.encode(userVo));
    ClientHelper.getClient().basicPublish(MqConst.HELLO_EXCHANGE, MqConst.HELLO_ROUTING_KEY, message, result -> {
      if (result.succeeded()) {
        log.info("Message[exchange: {}, routeKey: {}] published success!", MqConst.HELLO_EXCHANGE, MqConst.HELLO_ROUTING_KEY);
      } else {
        log.error("Message send fail: ", result.cause());
      }
    });

    ctx.json(R.success("success"));
  }
}
8.日志
2022-02-13 14:43:13.826  INFO 24 --- [ntloop-thread-1] com.lance.rabbit.service.UserService    ---[  30] : Message[exchange: hello_exchange, routeKey: hello_route_key] published success!
2022-02-13 14:43:13.893  INFO 19 --- [ntloop-thread-0] com.lance.rabbit.config.ClientHelper    ---[  67] : Got message: {"name":"Jim Green","title":"book"}
2022-02-13 14:43:13.893  INFO 19 --- [ntloop-thread-0] com.lance.rabbit.config.ClientHelper    ---[  68] : Message[exchange: hello_exchange, routeKey: hello_route_key] receive success!
7.项目完整地址

Vertx之Rabbit MQ消息发送 Github 地址

Vertx之Rabbit MQ消息发送 Gitee 地址

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734123.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号