栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot整合MQTT

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot整合MQTT

MQTT协议
  • MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
    MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
    作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。MQTT协议特点
  • MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。
  • MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。
MQTT Client
  • publisher 和 subscriber 都属于 MQTT Client,之所以有发布者和订阅者这个概念,其实是一种相对的概念,就是指当前客户端是在发布消息还是在接收消息,发布和订阅的功能也可以由同一个 MQTT Client 实现。MQTT 客户端是运行 MQTT 库并通过网络连接到 MQTT 代理的任何设备(从微控制器到成熟的服务器)。例如,MQTT 客户端可以是一个非常小的、资源受限的设备,它通过无线网络进行连接并具有一个最低限度的库。基本上,任何使用 TCP/IP 协议使用 MQTT 设备的都可以称之为 MQTT Client。MQTT 协议的客户端实现非常简单直接,易于实施是 MQTT 非常适合小型设备的原因之一。MQTT 客户端库可用于多种编程语言。例如,Android、Arduino、C、C++、C#、Go、iOS、Java、JavaScript 和 .NET。
MQTT Broker
  • 与 MQTT Client 对应的就是 MQTT Broker,Broker 是任何发布/订阅协议的核心,根据实现的不同,代理可以处理多达数百万连接的 MQTT Client。 Broker 负责接收所有消息,过滤消息,确定是哪个Client 订阅了每条消息,并将消息发送给对应的 Client,Broker 还负责保存会话数据,这些数据包括订阅的和错过的消息。Broker 还负责客户端的身份验证和授权。
MQTT Connection
  • MQTT 协议基于 TCP/IP。客户端和代理都需要有一个 TCP/IP 协议支持。 MQTT 连接始终位于
准备工作 (下载Broker服务端,相关客户端工具)
  • 服务端工具:
    • mosquitto https://mosquitto.org/download/
  • 客户端工具:
    • MQTTX https://mqttx.app/zh#download
创建SpringBoot的工程 (这里自己搭建就行了)
  • 修改pom.xml ,增加相关mqtt的依赖
      
        
            org.springframework.integration
            spring-integration-stream
        
        
            org.springframework.integration
            spring-integration-mqtt
        


    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.1.17.RELEASE
         
    
    com.huawen
    mqtt-demo
    0.0.1
    mqtt-demo
    Boot with MQTT Demo
    
        1.8
    
    
        
            org.springframework.boot
            spring-boot-starter
        

        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-integration
        
        
        
            org.springframework.integration
            spring-integration-stream
        
        
            org.springframework.integration
            spring-integration-mqtt
        
        
            org.projectlombok
            lombok
            1.18.12
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    



  • 自定义yml配置
spring:
  application:
    name: MQTT-DEMO
server:
  port: 8989
#mqtt properties
mqtt:
  #uris 可以有多个 所以是个数组
  uris:
    - tcp://127.0.0.1:1883
  clientId: mqtt_test1
  topics:
    - demo
    - test
  username: admin
  password: 123456
  timeout: 30
  keepalive: 60
  qos: 1
  • 增加config配置读取yml文件 (使用了Lombok 需要自行添加pom依赖)
package com.huawen.mqtt.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;


@Component
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfiguration {

    
    private String[] uris;

    
    private String clientId;

    
    private String[] topics;

    
    private String username;

    
    private String password;

    
    private Integer timeout;

    
    private Integer keepalive;

    
    private Integer qos;
}
  • 消费者配置
package com.huawen.mqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;


@Configuration
@Slf4j
public class MqttInBoundConfiguration {
    @Resource
    private MqttConfiguration mqttProperties;

    //==================================== 消费消息==========================================//

    
    @Bean("input")
    public MessageChannel mqttInputChannel() {
        //直连通道
        return new DirectChannel();
    }


    
    @Bean
    public MqttPahoClientFactory inClientFactory() {
        //设置连接属性
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getUris());
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setConnectionTimeout(mqttProperties.getTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepalive());
        // 接受离线消息  告诉代理客户端是否要建立持久会话   false为建立持久会话
        options.setCleanSession(false);
        //设置断开后重新连接
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }


    
    @Bean
    public MessageProducer producer() {
        // Paho客户端消息驱动通道适配器,主要用来订阅主题  对inboundTopics主题进行监听
        //clientId 加后缀 不然会报retrying 不能重复
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_customer", inClientFactory(), mqttProperties.getTopics());
        adapter.setCompletionTimeout(5000);
        // Paho消息转换器
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        // 按字节接收消息
        // defaultPahoMessageConverter.setPayloadAsBytes(true);
        adapter.setConverter(defaultPahoMessageConverter);
        // 设置QoS
        adapter.setQos(mqttProperties.getQos());
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    
    @Bean
    @ServiceActivator(inputChannel = "input")
    public MessageHandler handler() {
        return message -> {
            log.info("收到的完整消息为--->{}", message);
            log.info("----------------------");
            log.info("message:" + message.getPayload());
            log.info("Id:" + message.getHeaders().getId());
            log.info("receivedQos:" + message.getHeaders().get(MqttHeaders.RECEIVED_QOS));
            String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
            log.info("topic:" + topic);
            log.info("----------------------");
        };
    }
}
  • 生产者配置
package com.huawen.mqtt.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;


@Configuration
public class MqttOutBoundConfiguration {
    @Resource
    private MqttConfiguration mqttProperties;

    //==================================== 发送消息==========================================//

    
    @Bean("out")
    public MessageChannel mqttOutBoundChannel() {
        //直连通道
        return new DirectChannel();
    }


    
    @Bean
    public MqttPahoClientFactory outClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        String[] uris = mqttProperties.getUris();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(uris);
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setConnectionTimeout(mqttProperties.getTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepalive());
        // 接受离线消息  告诉代理客户端是否要建立持久会话   false为建立持久会话
        options.setCleanSession(false);
        //设置断开后重新连接
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }

    
    @Bean
    @ServiceActivator(inputChannel = "out")
    public MessageHandler mqttOutbound() {
        //发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
        //clientId 加后缀 不然会报retrying 不能重复
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "_producer", outClientFactory());
        //如果设置成true,即异步,发送消息时将不会阻塞。
        messageHandler.setAsync(true);
        //设置默认QoS
        messageHandler.setDefaultQos(mqttProperties.getQos());
        // Paho消息转换器
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        //发送默认按字节类型发送消息
//        defaultPahoMessageConverter.setPayloadAsBytes(true);
        messageHandler.setConverter(defaultPahoMessageConverter);
        return messageHandler;
    }
}
  • 创建一个通用接口 用于发送数据
package com.huawen.mqtt.inter;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;


@MessagingGateway(defaultRequestChannel = "out")
public interface MqttGateway {
    
    void sendToMqtt(String payload);

    
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

    
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}
  • 生产者测试controller
package com.huawen.mqtt.controller;

import com.huawen.mqtt.bean.MyMessage;
import com.huawen.mqtt.inter.MqttGateway;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;


@RestController
public class MqttPublishController {
    @Resource
    private MqttGateway mqttGateWay;

    @PostMapping("/send")
    public String send(@RequestBody MyMessage myMessage) {
        // 发送消息到指定主题
        mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());
        return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();
    }
}

源码地址:https://github.com/KyrieXJL/MQTT_Demo

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/863090.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号