- RabbitMQ Management HTTP API的简单封装
- 官方相关
- 代码展示
官方文档:https://rawcdn.githack.com/rabbitmq/rabbitmq-server/v3.10.0/deps/rabbitmq_management/priv/www/api/index.html
支持版本:rabbitmq-v3.10.0
介绍翻译:所有URI将仅服务于application/json类型的资源,并且将需要HTTP Basic Auth认证。许多URI要求虚拟主机的名称作为路径的一部分,因为名称只能唯一地标识虚拟主机中的对象。由于默认虚拟主机名为"/“,因此需要将其编码为”%2F"。传参的JSON对象必须具有某些必选参数,可选参数是可以被忽略的。缺少必选参数将导致请求错误。
部分文档截图:
代码展示基于Spring框架以及Hutool,Fastjson等工具依赖
配置文件bootstrap.yml,仅供参考
rms: username: admin password: admin managementUrl: http:///192.168.200.250:15672
配置类RmsConfig.java
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import lombok.Data;
@Data
@Configuration
@ConfigurationProperties(prefix = "rms", ignoreInvalidFields = true)
public class RmsConfig {
private String managementUrl;
private String username;
private String password;
}
RabbitMQ管理API工具类RabbitmqHttpApi.java,这里拿创建用户和用户授权做例子,其他接口可以举一反三实现
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONPath;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpStatus;
@DependsOn("rmsConfig")
@Component
public class RabbitmqHttpApi {
private static String managementUrl;
private static String adminUsername;
private static String adminPassword;
@Autowired
public void setMqttConfig(RmsConfig rmsConfig) {
RabbitmqHttpApi.managementUrl = rmsConfig.getManagementUrl();
RabbitmqHttpApi.adminUsername = rmsConfig.getUsername();
RabbitmqHttpApi.adminPassword = rmsConfig.getPassword();
}
public enum Tags {
ADMINISTRATOR("administrator"),
MONITORING("monitoring"),
POLICYMAKER("policymaker"),
MANAGEMENT("management"),
IMPERSONATOR("impersonator"),
NONE("");
private final String text;
private Tags(String text) {
this.text = text;
}
}
public static boolean createCommonUser(String username, String password) throws Exception {
return createUser(username, password, Tags.NONE);
}
public static boolean createAdminUser(String username, String password) throws Exception {
return createUser(username, password, Tags.ADMINISTRATOR);
}
public static boolean createUser(String username, String password, Tags tags) throws Exception {
String param = String.format("{"password":"%s","tags":"%s"}", password, tags.text);
String url = String.format("%s/api/users/%s", managementUrl, username);
HttpResponse response = HttpRequest.put(url).basicAuth(adminUsername, adminPassword).body(param).execute();
return response.getStatus() == HttpStatus.HTTP_CREATED || response.getStatus() == HttpStatus.HTTP_NO_CONTENT;
}
public static boolean setAllPermissions(String username) throws Exception {
return setPermissions(username, "/", ".*", ".*", ".*");
}
public static boolean setPermissions(String username, String virtualHost, String configureRegexp,
String writeRegexp, String readRegexp) throws Exception {
String param = String.format("{"configure":".*","write":".*","read":".*"}", configureRegexp,
writeRegexp, readRegexp);
virtualHost = StrUtil.equals("/", virtualHost) ? "%2F" : virtualHost;
String url = managementUrl + "/api/permissions/" + virtualHost + "/" + username;
HttpResponse response = HttpRequest.put(url).basicAuth(adminUsername, adminPassword).body(param).execute();
return response.getStatus() == HttpStatus.HTTP_CREATED || response.getStatus() == HttpStatus.HTTP_NO_CONTENT;
}
public static Integer[] getUsedSockets() throws Exception {
String url = managementUrl + "/api/nodes";
HttpResponse response = HttpRequest.get(url).basicAuth(adminUsername, adminPassword).execute();
Object object = JSONPath.read(response.body(), "$['sockets_used']");
return ObjectUtil.isNotNull(object) ? Convert.toIntArray(object) : new Integer[] {};
}
}
在业务中调用,仅供参考
public void doBiz(String username, String password) {
if (createClientUser(username, password)) {
logger.info("创建rabbitmq用户【{}】成功", username);
}
Integer nodeIndex = dispatchNode();
if (!ObjectUtil.isNull(nodeIndex)) {
logger.info("节点【{}】连接数最少,进行调度", nodeIndex + 1);
}
}
public boolean createClientUser(String username, String password) {
try {
return RabbitmqHttpApi.createCommonUser(username, password) && RabbitmqHttpApi.setAllPermissions(username);
} catch (Exception e) {
logger.error("创建rabbitmq用户【{}】异常:{}", username, e.getMessage());
return false;
}
}
public Integer dispatchNode() {
try {
Integer[] useds = RabbitmqHttpApi.getUsedSockets();
if (useds.length == 0) {
throw new Exception("接口未获取到集群各节点socket使用数量");
}
return ArrayUtil.indexOf(useds, NumberUtil.min(useds));
} catch (Exception e) {
logger.error("调度mqtt节点异常:{}", e.getMessage());
}
return null;
}



