1.依赖类
org.springframework.boot
spring-boot-starter-integration
org.springframework.integration
spring-integration-stream
org.springframework.integration
spring-integration-mqtt
2.application.yml 配置文件
mqtt:
hostUrl: tcp:/
private String username;
private String password;
private String hostUrl;
private String defaultTopic;
private int timeout;
private int keepAlive;
private Boolean cleanSession;
private String clientId;
private Boolean reconnect;
private Boolean isOpen;
private Integer qos;
}
4. MqttConfig 根据环境是否启动 mqtt
@Configuration
public class MqttConfig {
@Autowired
private MqttAcceptClient mqttAcceptClient;
@Conditional(MqttCondition.class)
@Bean
public MqttAcceptClient getMqttPushClient() {
mqttAcceptClient.connect();
return mqttAcceptClient;
}
}
5. springboot 启动配置 mqtt类
public class MqttCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypemetadata annotatedTypemetadata) {
//获取当前环境信息
Environment environment = context.getEnvironment();
String isOpen = environment.getProperty("mqtt.isOpen");
return Boolean.parseBoolean(isOpen);
}
}
6.mqtt客户端类
@Component
public class MqttAcceptClient {
private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);
public static MqttClient client;
@Autowired
private MqttAcceptCallback mqttAcceptCallback;
@Autowired
private MqttProperties mqttProperties;
private static void setClient(MqttClient client) {
MqttAcceptClient.client = client;
}
public void connect() {
MqttClient client;
try {
client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAlive());
options.setAutomaticReconnect(mqttProperties.getReconnect());
options.setCleanSession(mqttProperties.getCleanSession());
MqttAcceptClient.setClient(client);
// 设置回调
client.setCallback(mqttAcceptCallback);
client.connect(options);
} catch (Exception e) {
logger.error("[客户端连接初始化异常:{}]", e.toString());
}
}
public void reconnection() {
//
try {
while (true) {
client.close();
this.connect();
if (client.isConnected()) {
logger.info("MQTT重新连接成功:" + client);
break;
}
Thread.sleep(10000);
}
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}
public void subscribe(String topic, int qos) {
logger.info("==============开始订阅主题==============" + topic);
try {
client.subscribe(topic, qos);
} catch (MqttException e) {
logger.error("");
}
}
public void unsubscribe(String topic) {
logger.info("==============开始取消订阅主题==============" + topic);
try {
client.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
7. mqtt消费端回调 类
@Component
public class MqttAcceptCallback implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);
@Autowired
private MqttAcceptClient mqttAcceptClient;
@Autowired
private alertVehicleService alertVehicleService;
@Override
public void connectionLost(Throwable throwable) {
logger.info("连接断开,可以做重连");
if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
logger.info("emqx重新连接....................................................");
mqttAcceptClient.reconnection();
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
logger.info("接收消息主题 : " + topic);
String payLoad = new String(mqttMessage.getPayload());
logger.info("接收消息 : " + payLoad );
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
String[] topics = token.getTopics();
for (String topic : topics) {
logger.info("向主题:" + topic + "发送消息成功!");
}
try {
MqttMessage message = token.getMessage();
byte[] payload = message.getPayload();
String s = new String(payload, StandardCharsets.UTF_8);
logger.info("消息的内容是:" + s);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectComplete(boolean reconnect, String serverUri) {
logger.info("--------------------ClientId:"
+ MqttAcceptClient.client.getClientId() + "客户端连接成功!--------------------");
// 以/#结尾表示订阅所有以test开头的主题
//需要填写你的主题
//需要填写你的主题
//需要填写你的主题
//需要填写你的主题
//需要填写你的主题
mqttAcceptClient.subscribe(主题名称, 1);
}
}



