如何搭建MQTT服务器,请参见基于RabbitMQ的MQTT插件搭建MQTT服务,使用MQTTX进行收发测试
mqtt client jar目前实现mqtt协议的broker有多个,如rabbitmq, mosquitto, emqx
mqtt client的jar也有多个,springboot针对MQTT也进行了集成【在项目中导入spring-integration-mqtt依赖】,springboot采用的mqtt client是org.eclipse.paho.client.mqttv3,
org.eclipse.paho.client.mqttv3实现了MQTT协议
client.mqttv3源码目录分析org.springframework.integration spring-integration-mqtt 5.5.4
- org.eclipse.paho.client.mqttv3 :主要用于对外提供服务,此包里提供的整个功能。
- org.eclipse.paho.client.mqttv3.internal: 提供了对mqttv3 中的接口的实现。
- org.eclipse.paho.client.mqttv3.internal.nls: 国际化相关文件。学习中可以忽略
- org.eclipse.paho.client.mqttv3.internal.security:MQTT支持SSL加密,这个包内实现了基于TLS协议SSLSocket;
- org.eclipse.paho.client.mqttv3.internal.wire : MQTT协议中报文信息,里面包含有心跳包、订阅包、发布包、确认包等
- org.eclipse.paho.client.mqttv3.persist:发布信息持久化类,MQTT提供两种保持发布消息的方式,
- 一种是将信息保持到文件中,
- 一种是直接保持 到 内存中。
- org.eclipse.paho.client.mqttv3.util:工具类。
- org.eclipse.paho.client.mqttv3.logging:日志包
消息的发布和确认等一些流程,主要是跟消息发布者所设定的QoS level有关
有三种发布消息服务质量
- 至多一次 Qos level=0: 消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。适用场景如下:
- 环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- 应急或危化企业的传感器数据
- 至少一次Qos level=1: 确保消息到达,但消息重复可能会发生。
- 只有一次Qos level=2: 确保消息到达一次。这一级别可用于如下情况。适用场景如下:
- 在计费系统中,消息重复或丢失会导致不正确的结果。
- 电商的订单、交付系统,消息重复可引起重复下单或扣费
MQTT消息类型(4-7),使用4位二进制表示,可代表16种消息类型
| Mnemonic | Enumeration | Description | 中文描述 |
|---|---|---|---|
| Reserved | 0 | Reserved | 保留 |
| CONNECT | 1 | Client request to connect to Server | 客户端请求连接到服务器 |
| CONNACK | 2 | Connect Acknowledgment | 连接确认 |
| PUBLISH | 3 | Publish message | 发布消息 |
| PUBACK | 4 | Publish Acknowledgment | 发布确认 |
| PUBREC | 5 | Publish Received (assured delivery part 1) | 发布收稿(有保证的交付第一部分) |
| PUBREL | 6 | Publish Release (assured delivery part 2) | 发布发行(有保证的交付第二部分) |
| PUBCOMP | 7 | Publish Complete (assured delivery part 3) | 发布完成(有保证的交付第三部分) |
| SUBSCRIBE | 8 | Client Subscribe request | 客户端订阅请求 |
| SUBACK | 9 | Subscribe Acknowledgment | 订阅确认 |
| UNSUBSCRIBE | 10 | Client Unsubscribe request | 客户端退订请求 |
| UNSUBACK | 11 | Unsubscribe Acknowledgment | 退订确认 |
| PINGREQ | 12 | PING Request | ping请求 |
| PINGRESP | 13 | PING Response | ping响应 |
| DISCONNECT | 14 | Client is Disconnecting | 客户端正在断开 |
| Reserved | 15 | Reserved | 保留 |
模拟客户端发消息org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.5 compile org.projectlombok lombok 1.16.20 com.alibaba fastjson 1.2.72
class MQTTPublishExample {
private static final Logger LOG = LoggerFactory.getLogger(MQTTExample.class);
public static void main(String[] args) {
String serverURI = "tcp://172.25.11.22:1883";
String topic = "mq-topic1";
String clientId = "test-1-58.10";
String userName = "admin";
String password = "20212021";
ClientMQTT pulishClient = new ClientMQTT(serverURI, clientId, userName, password, topic);
pulishClient.connect(false);
pulishClient.sendMessage();
}
}
模拟客户端订阅消息
class MQTTSubscribleExample {
private static final Logger LOG = LoggerFactory.getLogger(MQTTExample.class);
public static void main(String[] args) {
String serverURI = "tcp://172.25.11.22:1883";
String topic = "mq-topic1";
String clientId = "test-1-58.10";
String userName = "admin";
String password = "20212021";
ClientMQTT subcribleClient = new ClientMQTT(serverURI, clientId, userName, password, topic);
subcribleClient.connect(true);
subcribleClient.subscribe(topic, clientId);
}
}
MQTTClient
class ClientMQTT {
private static final Logger LOG = LoggerFactory.getLogger(ClientMQTT.class);
public final String serverURI;
private final String clientId;
private String userName = "admin";
private String password = "20212021";
private String topicName;
private MqttClient client;
private MqttConnectOptions options;
private MqttTopic topic;
private ScheduledExecutorService scheduler;
public ClientMQTT(String serverURI, String clientId, String userName, String password, String topicName) {
this.serverURI = serverURI;
this.clientId = clientId;
this.userName = userName;
this.password = password;
this.topicName = topicName;
}
void connect(boolean isSubcrible) {
try {
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(serverURI, clientId, new MemoryPersistence());
// MQTT的连接设置
options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒
// 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
//设置断开后重新连接
options.setAutomaticReconnect(true);
topic = client.getTopic(topicName);
if (isSubcrible) {
// 设置回调
client.setCallback(new PushCallback());
//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
options.setWill(topic, "close".getBytes(), 1, true);
}
client.connect(options);
} catch (MqttException e) {
LOG.error(e.getMessage(), e);
}
}
public void subscribe(String topic, String clientId) {
try {
String[] topic1 = {topic};
client.subscribe(topic, 1);
} catch (MqttException e) {
LOG.error(e.getMessage(), e);
}
}
public void sendMessage() {
try {
for (int i = 1; i < 11; i++) {
MqttMessage message = new MqttMessage();
message.setQos(1); //保证消息能到达一次
message.setRetained(true);
message.setId(i);
Message msg = Message.builder().clientId(clientId).msg("测试消息-" + i).build();
message.setPayload(JSON.toJSONString(msg).getBytes());
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
LOG.info("message is published completely! " + token.isComplete());
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
public void close() {
try {
client.disconnect();
client.close();
} catch (MqttException e) {
LOG.error(e.getMessage(), e);
}
}
}
class PushCallback implements MqttCallback {
private static final Logger LOG = LoggerFactory.getLogger(PushCallback.class);
@Override
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
LOG.info("连接断开,可以做重连");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
LOG.info("deliveryComplete==>isComplete:{},", token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
LOG.info("接收消息==>主题:{},Qos:{},内容:{}", topic, message.getQos(), new String(message.getPayload()));
}
}
@Data
@Builder
class Message {
private String clientId;
private String msg;
}



