栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot mqtt emqx

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot mqtt emqx

1.依赖类

  
        
            org.springframework.boot
            spring-boot-starter-integration
        
        
            org.springframework.integration
            spring-integration-stream
        
        
            org.springframework.integration
            spring-integration-mqtt
        

2.application.yml 配置文件

mqtt:
  hostUrl: tcp:/
    private String username;

    
    private String password;

    
    private String hostUrl;


    
    private String defaultTopic;

    
    private int timeout;

    
    private int keepAlive;

    
    private Boolean cleanSession;


    
    private String clientId;

    
    private Boolean reconnect;

    
    private Boolean isOpen;

    
    private Integer qos;
}

4.  MqttConfig  根据环境是否启动 mqtt

@Configuration
public class MqttConfig {
    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    
    @Conditional(MqttCondition.class)
    @Bean
    public MqttAcceptClient getMqttPushClient() {
        mqttAcceptClient.connect();
        return mqttAcceptClient;
    }

}

5. springboot 启动配置 mqtt类

public class MqttCondition implements Condition {

    @Override
    public boolean matches(ConditionContext context, AnnotatedTypemetadata annotatedTypemetadata) {

        //获取当前环境信息
        Environment environment = context.getEnvironment();
        String isOpen = environment.getProperty("mqtt.isOpen");
        return Boolean.parseBoolean(isOpen);
    }
}

6.mqtt客户端类

@Component
public class MqttAcceptClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);

    public static MqttClient client;
    @Autowired
    private MqttAcceptCallback mqttAcceptCallback;
    @Autowired
    private MqttProperties mqttProperties;

    private static void setClient(MqttClient client) {
        MqttAcceptClient.client = client;
    }

    
    public void connect() {
        MqttClient client;
        try {
            client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            options.setCleanSession(mqttProperties.getCleanSession());
            MqttAcceptClient.setClient(client);
            // 设置回调
            client.setCallback(mqttAcceptCallback);
            client.connect(options);
        } catch (Exception e) {
            logger.error("[客户端连接初始化异常:{}]", e.toString());
        }
    }

    
    public void reconnection() {
        //
        try {
            while (true) {
                client.close();
                this.connect();
                if (client.isConnected()) {
                    logger.info("MQTT重新连接成功:" + client);
                    break;
                }
                Thread.sleep(10000);
            }
        } catch (MqttException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    
    public void subscribe(String topic, int qos) {
        logger.info("==============开始订阅主题==============" + topic);
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            logger.error("");
        }
    }

    
    public void unsubscribe(String topic) {
        logger.info("==============开始取消订阅主题==============" + topic);
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

7. mqtt消费端回调  类

@Component
public class MqttAcceptCallback implements MqttCallbackExtended {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);

  
    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    @Autowired
    private alertVehicleService alertVehicleService;

    
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("连接断开,可以做重连");
        if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
            logger.info("emqx重新连接....................................................");
            mqttAcceptClient.reconnection();
        }
    }

    
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        logger.info("接收消息主题 : " + topic);
        String payLoad = new String(mqttMessage.getPayload());
        logger.info("接收消息 : " + payLoad );
    }

    
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            logger.info("向主题:" + topic + "发送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, StandardCharsets.UTF_8);
            logger.info("消息的内容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    
    @Override
    public void connectComplete(boolean reconnect, String serverUri) {
        logger.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客户端连接成功!--------------------");
        // 以/#结尾表示订阅所有以test开头的主题
         //需要填写你的主题
        //需要填写你的主题
        //需要填写你的主题
        //需要填写你的主题
        //需要填写你的主题
        mqttAcceptClient.subscribe(主题名称, 1);
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/759940.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号