栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MQTT Client Jar分析及消息发布、订阅JAVA代码示例

MQTT Client Jar分析及消息发布、订阅JAVA代码示例

如何搭建MQTT服务器,请参见基于RabbitMQ的MQTT插件搭建MQTT服务,使用MQTTX进行收发测试

mqtt client jar

目前实现mqtt协议的broker有多个,如rabbitmq, mosquitto, emqx

mqtt client的jar也有多个,springboot针对MQTT也进行了集成【在项目中导入spring-integration-mqtt依赖】,springboot采用的mqtt client是org.eclipse.paho.client.mqttv3,
org.eclipse.paho.client.mqttv3实现了MQTT协议


  org.springframework.integration
  spring-integration-mqtt
  5.5.4

client.mqttv3源码目录分析

  • org.eclipse.paho.client.mqttv3 :主要用于对外提供服务,此包里提供的整个功能。
  • org.eclipse.paho.client.mqttv3.internal: 提供了对mqttv3 中的接口的实现。
  • org.eclipse.paho.client.mqttv3.internal.nls: 国际化相关文件。学习中可以忽略
  • org.eclipse.paho.client.mqttv3.internal.security:MQTT支持SSL加密,这个包内实现了基于TLS协议SSLSocket;
  • org.eclipse.paho.client.mqttv3.internal.wire : MQTT协议中报文信息,里面包含有心跳包、订阅包、发布包、确认包等
  • org.eclipse.paho.client.mqttv3.persist:发布信息持久化类,MQTT提供两种保持发布消息的方式,
    • 一种是将信息保持到文件中,
    • 一种是直接保持 到 内存中。
  • org.eclipse.paho.client.mqttv3.util:工具类。
  • org.eclipse.paho.client.mqttv3.logging:日志包
服务质量QoS

消息的发布和确认等一些流程,主要是跟消息发布者所设定的QoS level有关

有三种发布消息服务质量

  • 至多一次 Qos level=0: 消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。适用场景如下:
    • 环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    • 应急或危化企业的传感器数据
  • 至少一次Qos level=1: 确保消息到达,但消息重复可能会发生。
  • 只有一次Qos level=2: 确保消息到达一次。这一级别可用于如下情况。适用场景如下:
    • 在计费系统中,消息重复或丢失会导致不正确的结果。
    • 电商的订单、交付系统,消息重复可引起重复下单或扣费
MQTT报文类型

MQTT消息类型(4-7),使用4位二进制表示,可代表16种消息类型

MnemonicEnumerationDescription中文描述
Reserved0Reserved保留
CONNECT1Client request to connect to Server客户端请求连接到服务器
CONNACK2Connect Acknowledgment连接确认
PUBLISH3Publish message发布消息
PUBACK4Publish Acknowledgment发布确认
PUBREC5Publish Received (assured delivery part 1)发布收稿(有保证的交付第一部分)
PUBREL6Publish Release (assured delivery part 2)发布发行(有保证的交付第二部分)
PUBCOMP7Publish Complete (assured delivery part 3)发布完成(有保证的交付第三部分)
SUBSCRIBE8Client Subscribe request客户端订阅请求
SUBACK9Subscribe Acknowledgment订阅确认
UNSUBSCRIBE10Client Unsubscribe request客户端退订请求
UNSUBACK11Unsubscribe Acknowledgment退订确认
PINGREQ12PING Requestping请求
PINGRESP13PING Responseping响应
DISCONNECT14Client is Disconnecting客户端正在断开
Reserved15Reserved保留
代码示例 导入maven依赖
		
            org.eclipse.paho
            org.eclipse.paho.client.mqttv3
            1.2.5
            compile
        
        
            org.projectlombok
            lombok
            1.16.20
        
        
            com.alibaba
            fastjson
            1.2.72
        
模拟客户端发消息
class MQTTPublishExample {
    private static final Logger LOG = LoggerFactory.getLogger(MQTTExample.class);

    public static void main(String[] args) {
        String serverURI = "tcp://172.25.11.22:1883";
        String topic = "mq-topic1";
        String clientId = "test-1-58.10";
        String userName = "admin";
        String password = "20212021";

        ClientMQTT pulishClient = new ClientMQTT(serverURI, clientId, userName, password, topic);
        pulishClient.connect(false);
        pulishClient.sendMessage();
    }
}
模拟客户端订阅消息
class MQTTSubscribleExample {
    private static final Logger LOG = LoggerFactory.getLogger(MQTTExample.class);

    public static void main(String[] args) {
        String serverURI = "tcp://172.25.11.22:1883";
        String topic = "mq-topic1";
        String clientId = "test-1-58.10";
        String userName = "admin";
        String password = "20212021";
        ClientMQTT subcribleClient = new ClientMQTT(serverURI, clientId, userName, password, topic);
        subcribleClient.connect(true);
        subcribleClient.subscribe(topic, clientId);
    }
}
MQTTClient
class ClientMQTT {
    private static final Logger LOG = LoggerFactory.getLogger(ClientMQTT.class);
    public final String serverURI;
    private final String clientId;
    private String userName = "admin";
    private String password = "20212021";
    private String topicName;


    private MqttClient client;
    private MqttConnectOptions options;
    private MqttTopic topic;

    private ScheduledExecutorService scheduler;

    public ClientMQTT(String serverURI, String clientId, String userName, String password, String topicName) {
        this.serverURI = serverURI;
        this.clientId = clientId;
        this.userName = userName;
        this.password = password;
        this.topicName = topicName;
    }

    void connect(boolean isSubcrible) {
        try {
            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,
            // MemoryPersistence设置clientid的保存形式,默认为以内存保存
            client = new MqttClient(serverURI, clientId, new MemoryPersistence());
            // MQTT的连接设置
            options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(false);
            // 设置连接的用户名
            options.setUserName(userName);
            // 设置连接的密码
            options.setPassword(password.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒
            // 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            //设置断开后重新连接
            options.setAutomaticReconnect(true);
            topic = client.getTopic(topicName);
            if (isSubcrible) {
                // 设置回调
                client.setCallback(new PushCallback());
                //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
                options.setWill(topic, "close".getBytes(), 1, true);
            }
            client.connect(options);
        } catch (MqttException e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void subscribe(String topic, String clientId) {
        try {
            String[] topic1 = {topic};
            client.subscribe(topic, 1);
        } catch (MqttException e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void sendMessage() {
        try {
            for (int i = 1; i < 11; i++) {
                MqttMessage message = new MqttMessage();
                message.setQos(1);  //保证消息能到达一次
                message.setRetained(true);
                message.setId(i);
                Message msg = Message.builder().clientId(clientId).msg("测试消息-" + i).build();
                message.setPayload(JSON.toJSONString(msg).getBytes());

                MqttDeliveryToken token = topic.publish(message);
                token.waitForCompletion();
                LOG.info("message is published completely! " + token.isComplete());
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void close() {
        try {
            client.disconnect();
            client.close();
        } catch (MqttException e) {
            LOG.error(e.getMessage(), e);
        }
    }
}

class PushCallback implements MqttCallback {
    private static final Logger LOG = LoggerFactory.getLogger(PushCallback.class);

    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        LOG.info("连接断开,可以做重连");
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        LOG.info("deliveryComplete==>isComplete:{},", token.isComplete());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        LOG.info("接收消息==>主题:{},Qos:{},内容:{}", topic, message.getQos(), new String(message.getPayload()));
    }
}

@Data
@Builder
class Message {
    private String clientId;
    private String msg;
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/316212.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号