maven依赖使用的技术:org.apache.commons.pool2、vertx-mqtt。使用的是pool2的通用连接池,同理可以对其他的一些客户端连接进行池化。暂未投入生产使用,有问题需自行维护并欢迎指正!
源码地址:gitee源码参考地址
mqtt连接配置org.apache.commons commons-pool2 2.11.1 io.vertx vertx-mqtt 4.1.1
@Component
@ConfigurationProperties(prefix = "mqtt.client")
public class MqttClientProperties {
// 地址
private String host = MqttClientOptions.DEFAULT_HOST;
// 端口号
private Integer port = MqttClientOptions.DEFAULT_PORT;
// 登录用户
private String username;
// 登录密码
private String password;
// 连接超时时间(秒)
private Integer connectTimeout;
// 等待mqtt回应的超时时间
private Integer ackTimeout =MqttClientOptions.DEFAULT_ACK_TIMEOUT;
private boolean autoGeneratedClientId = true;
private boolean autoKeepAlive ;
private boolean cleanSession ;
private boolean willFlag ;
private int willQoS;
private boolean willRetain;
private int keepAliveInterval = MqttClientOptions.DEFAULT_KEEP_ALIVE_INTERVAL;
private int maxInflightQueue;
private int maxMessageSize = MqttClientOptions.DEFAULT_MAX_MESSAGE_SIZE;
private String willMessage;
private String willTopic;
// 连接池的配置
private int minIdle = 3;
private int maxTotal = 8;
private int maxIdle = 8;
private int maxWaitMillis = 6000;
// 为true时borrowMaxWaitMillis才有作用
private boolean blockWhenExhausted = true;
// 设置成true才会调用工厂验证
private boolean testOnBorrow = true;
private boolean testOnReturn;
private boolean testWhileIdle;
private boolean jmxEnabled = false;
private boolean fairness;
private boolean maxWaitDuration;
setter...
getter...
}
mqtt连接工厂
import io.vertx.core.Vertx; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; @Slf4j public class MqttClientFactory extends BasePooledObjectFactorymqtt的连接template{ private MqttClientProperties config; public MqttClientFactory(MqttClientProperties config) { this.config = config; } @Override public MqttClient create() throws InterruptedException { MqttClientOptions options = new MqttClientOptions(); options.setMaxMessageSize(config.getMaxMessageSize()); options.setPassword(config.getPassword()); options.setUsername(config.getUsername()); options.setAckTimeout(config.getAckTimeout()); options.setAutoGeneratedClientId(config.isAutoGeneratedClientId()); options.setClientId(config.getClientId()); options.setAutoKeepAlive(config.isAutoKeepAlive()); options.setCleanSession(config.isCleanSession()); options.setKeepAliveInterval(config.getKeepAliveInterval()); options.setWillFlag(config.isWillFlag()); options.setWillQoS(config.getWillQoS()); MqttClient client = MqttClient.create(Vertx.vertx(), options); MqttClient connect = client.connect(config.getPort(), config.getHost(), r -> { if (r.succeeded()) { log.info("{}:连接成功回调",log.getName()); } if (r.failed()) { log.error("{}:连接失败回调",log.getName()); } }); return connect; } @Override public PooledObject wrap(MqttClient client) { return new DefaultPooledObject<>(client); } @Override public void destroyObject(PooledObject pooled) { if (pooled == null) { return; } MqttClient client = pooled.getObject(); if (client.isConnected()) { client.disconnect(); } } @Override public boolean validateObject(PooledObject pooled) { MqttClient client = pooled.getObject(); boolean flag = false; while (!flag){ flag = client.isConnected(); } return flag; } }
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@Slf4j
public class MqttTemplate {
private GenericObjectPool clientPool;
public MqttTemplate() {
}
public MqttTemplate(MqttClientFactory factory){
GenericObjectPoolConfig config = new GenericObjectPoolConfig<>();
config.setMinIdle(1);
config.setBlockWhenExhausted(true);
// 一定要打开,因为创建连接客户端是异步的,需要在获取的使用对客户端进行校验判断,拿到connected客户端
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
//一定要关闭jmx,不然springboot启动会报已经注册了某个jmx的错误
config.setJmxEnabled(false);
this.clientPool = new GenericObjectPool<>(factory,config);
//这里可以做一些初始化连接
}
public MqttTemplate(MqttClientFactory factory,MqttClientProperties properties){
GenericObjectPoolConfig config = new GenericObjectPoolConfig<>();
config.setMinIdle(properties.getMinIdle());
config.setBlockWhenExhausted(properties.isBlockWhenExhausted());
// 一定要打开,因为创建连接客户端是异步的,需要在获取的使用对客户端进行判断
config.setTestOnBorrow(properties.isTestOnBorrow());
config.setTestOnReturn(properties.isTestOnReturn());
config.setTestWhileIdle(properties.isTestWhileIdle());
config.setMaxIdle(properties.getMaxIdle());
config.setMaxTotal(properties.getMaxTotal());
//一定要关闭jmx,不然springboot启动会报已经注册了某个jmx的错误
config.setJmxEnabled(properties.isJmxEnabled());
this.clientPool = new GenericObjectPool<>(factory,config);
//这里可以做一些初始化连接
}
public boolean publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain){
try {
MqttClient client = clientPool.borrowObject();
if(client.isConnected()){
log.info("{}获取连接成功",log.getName());
client.publish(topic, payload, qosLevel, isDup, isRetain, r -> {
log.info("{}消息推送成功",log.getName());
// 归还客户端
clientPool.returnObject(client);
});
return true;
}else{
log.error("{}获取的客户端是断开的!",log.getName());
}
} catch (Exception e) {
log.error("{}获取连接失败:{}",log.getName(),e.getMessage());
}
return false;
}
...可以再自定义的去封装一些方法
}
mqtt连接池配置
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties(MqttClientProperties.class)
public class MqttConfig {
private MqttClientProperties mqttClientProperties;
@Autowired
public void setClientProperties(MqttClientProperties mqttClientProperties) {
this.mqttClientProperties = mqttClientProperties;
}
@Bean
public MqttClientFactory getClientFactory() {
return new MqttClientFactory(mqttClientProperties);
}
@Bean
public MqttTemplate getMqttTemplate() {
return new MqttTemplate(getClientFactory(),mqttClientProperties);
}
}
使用
@Autowired
private MqttTemplate mqttTemplate;
@GetMapping
public boolean pool(){
boolean publish = mqttTemplate.publish("20220428testtopic", Buffer.buffer("aaa"), MqttQoS.AT_MOST_ONCE, false, false);
return publish;
}



