栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot+mqtt+apache apollo,监听信息并可以动态更改topic

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot+mqtt+apache apollo,监听信息并可以动态更改topic

一、需求

        通过Java集成mqtt来获得设备监控到的数据,并且当设备发送mqtt的topic发生改变时,Java可以动态改变topic来继续监听设备发送的数据。

二、实现 1、新建一个demo数据库并添加几条数据来进行测试

站点设备信息表

存放监听数据表

2、创建一个springboot项目,开始编写Java代码

(1)创建springboot

            具体创建过程略,可参考文章使用IDEA创建一个springboot项目 - 码出精彩人生 - 博客园 (cnblogs.com)

(2)配置配置文件

server.port=13010
spring.datasource.url = jdbc:mysql://localhost:3306/iot-mqtt-demo?characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull
spring.datasource.username = root
spring.datasource.password = 123456
spring.datasource.driver-class-name = com.mysql.jdbc.Driver
spring.datasource.type = com.zaxxer.hikari.HikariDataSource
spring.mqtt.url = tcp://127.0.0.1:61613
consumer.client.id = iot_mqtt
spring.mqtt.username = admin
spring.mqtt.password = password
spring.mqtt.completionTimeout = 3000

(3)pom文件导入相关jar包


    4.0.0
    com.iotmqtt
    iot-mqtt-demo
    0.0.1-SNAPSHOT
    
        org.springframework.boot
        spring-boot-starter-parent
        2.0.0.RELEASE
    
    
        
        
            org.springframework.integration
            spring-integration-mqtt
        
        
            org.springframework.integration
            spring-integration-stream
        
        
            org.apache.commons
            commons-lang3
            3.0
        
        
        
            io.springfox
            springfox-swagger2
            2.7.0
        
        
            io.springfox
            springfox-swagger-ui
            2.7.0
        
        
            org.springframework.boot
            spring-boot-starter-test
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
        
            mysql
            mysql-connector-java
        
        
        
            org.mybatis.spring.boot
            mybatis-spring-boot-starter
            2.0.0
        
        
            org.mybatis
            mybatis
            3.4.1
        
        
        
            com.alibaba
            fastjson
            1.2.7
        
        
            org.apache.httpcomponents
            httpclient
            4.5.1
        
        
        
            org.apache.poi
            poi
            3.16
        
        
            org.apache.poi
            poi-ooxml
            3.16
        
        
        
            org.projectlombok
            lombok
            true
        
    
    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.1
                
                    1.8
                    1.8
                
            
        
    

(3)创建MqttConfig,配置mqtt连接,编写topic更新方法

package com.iotmqtt.iotmqttdemo.config;
import com.iotmqtt.iotmqttdemo.mapper.DemoMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Component
@Configuration
@EnableAutoConfiguration
public class MqttConfig {

    private Log log = LogFactory.getLog(MqttConfig.class);
    @Value("${spring.mqtt.username}")
    private String username;
    @Value("${spring.mqtt.password}")
    private String password;
    @Value("${spring.mqtt.url}")
    private String hostUrl;
    @Value("${consumer.client.id}")
    private String clientId;
    @Value("${spring.mqtt.completionTimeout}")
    private static int completionTimeout ;
    private static MqttClient client;
    @Autowired
    DemoMapper demoMapper;
    
    public void addToipc(String topic) throws MqttException {
        client.subscribe(topic);
    }

    
    public void removeTopic(String topic) throws MqttException {
        client.unsubscribe(topic);
    }

    public void init() throws MqttException {
        //host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
        //MemoryPersistence设置clientid的保存形式,默认为以内存保存
        if(client==null) {
            client = new MqttClient(hostUrl, clientId, new MqttDefaultFilePersistence());
        }
        MqttConnectOptions options = new MqttConnectOptions();
        //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        //这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
        //设置连接的用户名
        options.setUserName(username);
        //设置连接的密码
        options.setPassword(password.toCharArray());
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(20);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        //回调
        client.setCallback(new MqttCallback() {
            @Override
            public void messageArrived(String topicName, MqttMessage message) throws Exception {
                //subscribe后得到的消息会执行到这里面
                process(topicName,message.toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                //publish后会执行到这里
            }

            @Override
            public void connectionLost(Throwable cause) {
                // //连接丢失后,一般在这里面进行重连
                try {
                    init();
                }catch (Exception e){
                    e.printStackTrace();
                }

            }
        });
        //链接
        client.connect(options);
        //订阅
        client.subscribe(demoMapper.getTopics());
        //取消订阅
        //client.unsubscribe(topicStr);
    }

    
    public void PushMsg(String deviceNum, String msg){
        //tpoic
        String topic = deviceNum;
        MqttMessage m=new MqttMessage();
        m.setRetained(true);
        m.setPayload(msg.getBytes());
        try {
            log.info("主题:"+topic+"-----内容:"+m);
            client.publish(topic, m);
        }catch(Exception e){
            System.out.println("发布消息失败-->"+msg);
            e.printStackTrace();
        }

    }

    
    void process(String topicName,String message){
        //处理数据代码
    }

}

(4)编写获得站点/设备topic的方法和保存数据的方法

mapper

package com.iotmqtt.iotmqttdemo.mapper;
import com.iotmqtt.iotmqttdemo.bean.SiteData;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
@Mapper
public interface DemoMapper {

    @Select("select topics from site_info")
    String[] getTopics();
    @Insert("INSERT INTO site_data (siteId,data) VALUES (#{siteId},#{data})")
    void insert(SiteData siteData);
}

(5)创建controller编写更新topic接口

package com.iotmqtt.iotmqttdemo.controller;
import com.iotmqtt.iotmqttdemo.config.MqttConfig;
import com.iotmqtt.iotmqttdemo.service.DemoService;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping({"/demo"})
public class DemoController {

    @Autowired
    DemoService demoService;
    @Autowired
    MqttConfig mqttConfig;
    @ApiOperation(value = "删除topic", notes = "")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "topic", value = "阿波罗topic", required = false, paramType = "String")
    })
    @GetMapping("/removeTopic")
    public String removeTopic(String topic) throws MqttException {
        mqttConfig.removeTopic(topic);
        return "删除成功";
    }

    @ApiOperation(value = "新增topic", notes = "")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "topic", value = "阿波罗topic", required = false, paramType = "String")
    })
    @GetMapping("/addTopic")
    public String addTopic(String topic) throws MqttException {
        mqttConfig.addToipc(topic);
        return "添加成功";
    }

}
三、测试代码 1、服务启动后,使用mqtt连接工具发送数据测试功能

(1)使用谷歌插件 MQTTlens 连接mqtt并发送数据

发送数据测试

(2)修改一个站点的topic,调用更新接口再此发送数据测试

修改1站点topic

执行更新topic接口

修改后重新发送数据

(4)检查数据更新情况

可以看到我们数据已经可以正常上来

关注公众号 Z丶learn 回复 mqttdemo 获得源码demo

微信公众号

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/322319.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号