栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot-整合mqtt

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot-整合mqtt

pom和config
 		
            org.springframework.integration
            spring-integration-stream
        
        
            org.springframework.integration
            spring-integration-mqtt
        

MQTTConfig

package com.example.config;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Objects;


@Configuration
@IntegrationComponentScan //@MessagingGateway 注解搜索指定的集成注解. 在我们的示例中,它将会扫描到使用@MessagingGateway注解的词条网关.
@Slf4j
@Getter
@Setter
public class MqttConfig {

    public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";

    public static final String INPUT_CHANNEL = "mqttInputChannel";

    public static final String SUB_TOPICS = "PSimulation,Pressure,PSimulationPump,PSimulationPressure," +
            "PSimulationValve,PSimulationFlow,FSimulation,FSimulationPump,FSimulationPressure," +
            "FSimulationValve,FSimulationFlow,leak,blast";

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.server}")
    private String hostUrl;

    @Value("${mqtt.client.id}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String defaultTopic;

    @PostConstruct // 执行顺序 Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
    public void init() {

        log.error("username:{} password:{} hostUrl:{} clientId :{} ===={} ",
                this.username, this.password, this.hostUrl, this.clientId, this.defaultTopic);
    }


    
    @Bean
    public MqttPahoClientFactory clientFactory() {
        // 可以理解为mqtt 的连接队列
        final MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{hostUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        // 客户端
        final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(options);
        return factory;
    }

    // 以下是发布消息

    
    @Bean(value = OUTBOUND_CHANNEL)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
    //ServiceActivator注解表明当前方法用于处理MQTT消息,OUTBOUND_CHANNEL参数指定了用于接收消息信息的channel。
    public MessageHandler mqttOutbound() {

        // 创建工厂 客户端id 和 工厂
        final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());
        // 业务质量
        handler.setDefaultQos(1);
        // 消息保留
        handler.setDefaultRetained(false);
        // 发送的默认主题
        handler.setDefaultTopic(defaultTopic);
        // 阻塞
        handler.setAsync(false);
        //当async和async-events都为true时,发出MqttMessageSentEvent事件,包含消息、主题以及由客户端库生成的消息id,客户端id和客户端实例(每次客户端连接增加)。当传送由客户端库确认,发出MqttMessageDeliveredEvent,包含消息号、客户端号和客户端实例,使传送与发送相关联。这些事件可以由任意ApplicationListener接收,或者通过事件inbound通道适配器。注意:在MqttMessageSentEvent之前可能会接收到MqttMessageDeliveredEvent。默认值为false。
        handler.setAsyncEvents(false);
        return handler;
    }

    @Service
    @MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
    //@MessagingGateway是一个用于提供消息网关代理整合的注解,参数defaultRequestChannel指定发送消息绑定的channel。 表示对外暴露发送的方法
    public interface MqttGateway {


        void sendToMqtt(String payload);

        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

    }
    // 以下是订阅消息

    
    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    //配置client,监听的topic
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                // 三个参数, 客户端id  工厂  监听主题列表
                new MqttPahoMessageDrivenChannelAdapter(
                        clientId + "_inbound", clientFactory(), SUB_TOPICS.split(","));
        // 发送超时 如果通道可能会阻塞,才会运用
        adapter.setCompletionTimeout(3000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        // 通道
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = INPUT_CHANNEL)
    public MessageHandler handler() {
        return message -> {
            String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
            log.info("topic: {}", topic);
            String[] topics = SUB_TOPICS.split(",");
            for (String t : topics) {
                if (t.equals(topic)) {
                    log.info("payload: {}", message.getPayload().toString());
                }
            }
        };
    }
}

bean

以下是借鉴了别人的Spring整合MQTT


 

1)客户端id
2)代理URL
3)适配器会接受到消息的一组以逗号分隔的主题
4)以逗号分隔的一组QoS值,可以是所有主题运用单一值,或者每一个主题一个值(列表必须同样长度)
5)MqttMessageConverter(可选项),默认DefaultPahoMessageConverter生成消息带字符串载荷(默认),携带头部包括:
mqtt_topic 接收消息主题
mqtt_duplicate 如果消息重复,值为true
mqtt_qos 业务质量
DefaultPahoMessageConverter可配置为返回载荷原始byte[]类型,通过将其声明为一个实体类,并且设定payloadAsBytes属性
6)客户端工厂
7)发送超时-如果通道可能会阻塞,才会运用(例如当前已满的边界QueueChannel)
8)错误通道–如果使用的话,ErrorMessage消息下行异常会发送至该通道,载荷为MessagingException,包含错误消息与原因
9)恢复间隔–控制在故障之后适配器会尝试重新连接的时间间隔,默认为10000ms(10s)
从4.1版本开始,编程方式改变适配器订阅的主题可以省略url,DefaultMqttPahoClientFactory属性serverURIs可以提供服务端URI,例如,这将使能连接至HA高可用簇。
从4.2.2版本开始,当适配器成功订阅至主题后,发布MqttSubscribedEvent,当连接/订阅失败时,发布MqttConnectionFailedEvent。这些事件可以由实现ApplicationListener接口的实体类获取。
新的属性recoveryInterval控制在故障之后适配器会尝试重新连接的时间间隔,默认为10000ms(10s)
在4.2.3版本之前,当适配器停止后,客户端总是会解除订阅。这是不正确的,因为如果客户端QoS大于0,我们需要保持订阅以便适配器停止时到达的消息在下一次开始时会传送。这也需要设置客户端工厂cleanSession属性为false,默认值为true。
从4.2.3版本开始,适配器不会解除订阅(默认),如果cleanSession值为false。可以重写该行为,通过设置工厂属性consumerCloseAction,可以有以下值:UNSUBSCRIBE_ALWAYS, UNSUBSCRIBE_NEVER以及UNSUBSCRIBE_CLEAN,后者(默认)会解除订阅仅当cleanSession属性值为true。
回退至4.2.3之前的行为,使用UNSUBSCRIBE_ALWAYS。


1)客户端id
2) 代理URL
3)MqttMessageConverter(可选),默认DefaultPahoMessageConverter识别以下头部:
mqtt_topic 消息发送主题
mqtt_retained 如果消息保留的话,值为true
mqtt_qos 业务质量
4)客户端工厂
5)默认业务质量(用于未发现mqtt_qos头部的情况),如果自定义converter提供的话,不允许采用
6)保留标记符默认值(用于未发现mqtt_retained头部的情况),如果自定义converter提供的话,不允许采用
7)消息发送默认主题(用于未发现mqtt_topic头部的情况)
8)当为true,当发送消息时,调用者不会阻塞等待传送确认,默认值:false(发送阻塞直到传送确认)
9)当async和async-events都为true时,发出MqttMessageSentEvent事件,包含消息、主题以及由客户端库生成的消息id,客户端id和客户端实例(每次客户端连接增加)。当传送由客户端库确认,发出MqttMessageDeliveredEvent,包含消息号、客户端号和客户端实例,使传送与发送相关联。这些事件可以由任意ApplicationListener接收,或者通过事件inbound通道适配器。注意:在MqttMessageSentEvent之前可能会接收到MqttMessageDeliveredEvent。默认值为false。

从版本4.1开始,可以省略url,DefaultMqttPahoClientFactory属性serverURIs可以提供服务器URI。例如,这将使能连接至HA高可用簇。
Java

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/352244.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号