通过Java集成mqtt来获得设备监控到的数据,并且当设备发送mqtt的topic发生改变时,Java可以动态改变topic来继续监听设备发送的数据。
二、实现 1、新建一个demo数据库并添加几条数据来进行测试站点设备信息表
存放监听数据表
2、创建一个springboot项目,开始编写Java代码(1)创建springboot
具体创建过程略,可参考文章使用IDEA创建一个springboot项目 - 码出精彩人生 - 博客园 (cnblogs.com)
(2)配置配置文件
server.port=13010 spring.datasource.url = jdbc:mysql://localhost:3306/iot-mqtt-demo?characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull spring.datasource.username = root spring.datasource.password = 123456 spring.datasource.driver-class-name = com.mysql.jdbc.Driver spring.datasource.type = com.zaxxer.hikari.HikariDataSource spring.mqtt.url = tcp://127.0.0.1:61613 consumer.client.id = iot_mqtt spring.mqtt.username = admin spring.mqtt.password = password spring.mqtt.completionTimeout = 3000
(3)pom文件导入相关jar包
4.0.0 com.iotmqtt iot-mqtt-demo0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent2.0.0.RELEASE org.springframework.integration spring-integration-mqttorg.springframework.integration spring-integration-streamorg.apache.commons commons-lang33.0 io.springfox springfox-swagger22.7.0 io.springfox springfox-swagger-ui2.7.0 org.springframework.boot spring-boot-starter-testorg.springframework.boot spring-boot-starter-webmysql mysql-connector-javaorg.mybatis.spring.boot mybatis-spring-boot-starter2.0.0 org.mybatis mybatis3.4.1 com.alibaba fastjson1.2.7 org.apache.httpcomponents httpclient4.5.1 org.apache.poi poi3.16 org.apache.poi poi-ooxml3.16 org.projectlombok lomboktrue org.springframework.boot spring-boot-maven-pluginorg.apache.maven.plugins maven-compiler-plugin3.1 1.8 1.8
(3)创建MqttConfig,配置mqtt连接,编写topic更新方法
package com.iotmqtt.iotmqttdemo.config;
import com.iotmqtt.iotmqttdemo.mapper.DemoMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Component
@Configuration
@EnableAutoConfiguration
public class MqttConfig {
private Log log = LogFactory.getLog(MqttConfig.class);
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${consumer.client.id}")
private String clientId;
@Value("${spring.mqtt.completionTimeout}")
private static int completionTimeout ;
private static MqttClient client;
@Autowired
DemoMapper demoMapper;
public void addToipc(String topic) throws MqttException {
client.subscribe(topic);
}
public void removeTopic(String topic) throws MqttException {
client.unsubscribe(topic);
}
public void init() throws MqttException {
//host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
//MemoryPersistence设置clientid的保存形式,默认为以内存保存
if(client==null) {
client = new MqttClient(hostUrl, clientId, new MqttDefaultFilePersistence());
}
MqttConnectOptions options = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
//这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
//设置连接的用户名
options.setUserName(username);
//设置连接的密码
options.setPassword(password.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(20);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
//回调
client.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topicName, MqttMessage message) throws Exception {
//subscribe后得到的消息会执行到这里面
process(topicName,message.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
}
@Override
public void connectionLost(Throwable cause) {
// //连接丢失后,一般在这里面进行重连
try {
init();
}catch (Exception e){
e.printStackTrace();
}
}
});
//链接
client.connect(options);
//订阅
client.subscribe(demoMapper.getTopics());
//取消订阅
//client.unsubscribe(topicStr);
}
public void PushMsg(String deviceNum, String msg){
//tpoic
String topic = deviceNum;
MqttMessage m=new MqttMessage();
m.setRetained(true);
m.setPayload(msg.getBytes());
try {
log.info("主题:"+topic+"-----内容:"+m);
client.publish(topic, m);
}catch(Exception e){
System.out.println("发布消息失败-->"+msg);
e.printStackTrace();
}
}
void process(String topicName,String message){
//处理数据代码
}
}
(4)编写获得站点/设备topic的方法和保存数据的方法
mapper
package com.iotmqtt.iotmqttdemo.mapper;
import com.iotmqtt.iotmqttdemo.bean.SiteData;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
@Mapper
public interface DemoMapper {
@Select("select topics from site_info")
String[] getTopics();
@Insert("INSERT INTO site_data (siteId,data) VALUES (#{siteId},#{data})")
void insert(SiteData siteData);
}
(5)创建controller编写更新topic接口
package com.iotmqtt.iotmqttdemo.controller;
import com.iotmqtt.iotmqttdemo.config.MqttConfig;
import com.iotmqtt.iotmqttdemo.service.DemoService;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping({"/demo"})
public class DemoController {
@Autowired
DemoService demoService;
@Autowired
MqttConfig mqttConfig;
@ApiOperation(value = "删除topic", notes = "")
@ApiImplicitParams({
@ApiImplicitParam(name = "topic", value = "阿波罗topic", required = false, paramType = "String")
})
@GetMapping("/removeTopic")
public String removeTopic(String topic) throws MqttException {
mqttConfig.removeTopic(topic);
return "删除成功";
}
@ApiOperation(value = "新增topic", notes = "")
@ApiImplicitParams({
@ApiImplicitParam(name = "topic", value = "阿波罗topic", required = false, paramType = "String")
})
@GetMapping("/addTopic")
public String addTopic(String topic) throws MqttException {
mqttConfig.addToipc(topic);
return "添加成功";
}
}
三、测试代码
1、服务启动后,使用mqtt连接工具发送数据测试功能
(1)使用谷歌插件 MQTTlens 连接mqtt并发送数据
发送数据测试
(2)修改一个站点的topic,调用更新接口再此发送数据测试
修改1站点topic
执行更新topic接口
修改后重新发送数据
(4)检查数据更新情况
可以看到我们数据已经可以正常上来
关注公众号 Z丶learn 回复 mqttdemo 获得源码demo
微信公众号



