栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java实现mqtt broker 消息处理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java实现mqtt broker 消息处理

java实现mqtt broker 消息处理 1.Eclipse Paho Java 1.1 Paho介绍

Paho Java客户端是用Java编写的MQTT客户端库,用于开发在JVM或其他Java兼容平台(例如Android)上运行的应用程序。

Paho不仅可以对接EMQ X Broker,还可以对接满足符合MQTT协议规范的消息代理服务端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1协议版本基本能满足百分之九十多的接入场景。

Paho Java客户端提供了两个API:

1:MqttAsyncClient提供了一个完全异步的API,其中活动的完成是通过注册的回调通知的。

2:MqttClientMqttAsyncClient周围的同步包装器,在这里,功能似乎与应用程序同步。

1.2 Paho实现消息收发

(1)导包

 
	org.eclipse.paho 
	org.eclipse.paho.client.mqttv3 
	1.2.5 

(2) 编写配置文件实体类

@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    
    private String brokerUrl;
    
    private String clientId;
    
    private String username;
    
    private String password;

    private String sendUrl;

    private String topicBai;

    private String topicCar;


    public String getBrokerUrl() {
        return brokerUrl;
    }

    public void setBrokerUrl(String brokerUrl) {
        this.brokerUrl = brokerUrl;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getSendUrl() {
        return sendUrl;
    }

    public void setSendUrl(String sendUrl) {
        this.sendUrl = sendUrl;
    }

    public String getTopicBai() {
        return topicBai;
    }

    public void setTopicBai(String topicBai) {
        this.topicBai = topicBai;
    }

    public String getTopicCar() {
        return topicCar;
    }

    public void setTopicCar(String topicCar) {
        this.topicCar = topicCar;
    }

    @Override
    public String toString() {
        return "MqttProperties{" +
                "brokerUrl='" + brokerUrl + ''' +
                ", clientId='" + clientId + ''' +
                ", username='" + username + ''' +
                ", password='" + password + ''' +
                ", sendUrl='" + sendUrl + ''' +
                ", topicBai='" + topicBai + ''' +
                ", topicCar='" + topicCar + ''' +
                '}';
    }

(3) 编写mqtt客户端, @PostConstruct系统运行的时候会执行此方法,实现系统启动自动连接和主题订阅,

或者实现ApplicationRunner,实现自动执行

@Component
public class EmqClient {
    
    private static final Logger log = LoggerFactory.getLogger(EmqClient.class);
    
    
    private IMqttClient mqttClient;
    
    @Autowired
    private MqttProperties mqttProperties;
    
    @Autowired
    private MqttCallback mqttCallback;


    @PostConstruct
    public void start(){
        init();
        //连接服务端
		connect(mqttProperties.getUsername(),mqttProperties.getPassword());
		//订阅一个主题
		subscribe(mqttProperties.getTopicCar(), QosEnum.QoS1);
    }

    public void init(){
        MqttClientPersistence mempersitence = new MemoryPersistence();
        try {
            mqttClient = new MqttClient(mqttProperties.getBrokerUrl(),mqttProperties.getClientId(),mempersitence);
        } catch (MqttException e) {
            log.error("初始化客户端mqttClient对象失败,errormsg={},brokerUrl={},clientId={}",e.getMessage(),mqttProperties.getBrokerUrl(),mqttProperties.getClientId());
        }

    }

    
    public void connect(String username,String password){
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(true);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);
        mqttClient.setCallback(mqttCallback);

        try {
            mqttClient.connect(options);
        } catch (MqttException e) {
            log.error("mqtt客户端连接服务端失败,失败原因{}",e.getMessage());
        }
    }

    
    @PreDestroy
    public void disConnect(){
        try {
            mqttClient.disconnect();
        } catch (MqttException e) {
            log.error("断开连接产生异常,异常信息{}",e.getMessage());
        }
    }

    
    public void reConnect(){
        try {
            mqttClient.reconnect();
        } catch (MqttException e) {
            log.error("重连失败,失败原因{}",e.getMessage());
        }
    }

    
    public boolean publish(String topic, String msg, QosEnum qos,boolean retain){

        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(msg.getBytes());
        mqttMessage.setQos(qos.value());
        mqttMessage.setRetained(retain);
        try {
            mqttClient.publish(topic,mqttMessage);
            return true;
        } catch (MqttException e) {
            log.error("发布消息失败,errormsg={},topic={},msg={},qos={},retain={}",e.getMessage(),topic,msg,qos.value(),retain);
            return false;
        }

    }

    
    public void subscribe(String topicFilter,QosEnum qos){
        try {
            mqttClient.subscribe(topicFilter,qos.value());
        } catch (MqttException e) {
            log.error("订阅主题失败,errormsg={},topicFilter={},qos={}",e.getMessage(),topicFilter,qos.value());
        }

    }

    
    public void unSubscribe(String topicFilter){
        try {
            mqttClient.unsubscribe(topicFilter);
        } catch (MqttException e) {
            log.error("取消订阅失败,errormsg={},topicfiler={}",e.getMessage(),topicFilter);
        }
    }
    
}

(4) 创建Qos服务枚举类

public enum QosEnum {
    QoS0(0),QoS1(1),QoS2(2);


    private final int value;

    QosEnum(int value) {
        this.value = value;
    }
    
    public int value(){
        return this.value;
    }
}

(5)消息回调,在连接接收到消息之后,我们需要将消息传入消息回调

@Component
public class MessageCallback implements MqttCallback {
    
    private static final Logger log = LoggerFactory.getLogger(MessageCallback.class);

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private EmqClient emqClient;

    
    @Override
    public void connectionLost(Throwable cause) {
        // 资源的清理  重连
        log.info("丢失了对服务端的连接");
        emqClient.reConnect();
    }

    
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        log.info("订阅者订阅到了消息,主题topic={},消息IDmessageid={},消息级别qos={},内容payload={}",
                topic,
                message.getId(),
                message.getQos(),
                new String(message.getPayload()));
        }catch (Exception e){
            log.info("消息发送失败:消息id={},errorMsg={}", message.getId(), e.getMessage());
        }

    }

    
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        log.info("消息发布完成,messageid={},topics={}",messageId,topics);
    }
}

(6) 配置文件

mqtt:  
  broker-url: tcp://xxxxx.xxx.xx.x:1883
  client-id: emq-client
  username: xxx
  password: xxx


1.3 可通过emqx webhob实现消息数据的存储
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/755397.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号