前端时间因为工作需求研究了mqtt客服端(时间有点久远记不太清楚了)。
工作场景是这样的:房间内有警报的硬件,每次按警报时,硬件会发布一次主题,而我们需要接受这个主题信息,并把主题内的相关数据插入到数据库。硬件是第三方的,发布消息由第三方完成,我们只需要订阅消息。
所写demo可自行下载(https://download.csdn.net/download/weixin_44451527/85047878)
解析数据后如下图(根据实际情况进行相对应的解析)
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,一个消息中间件。
mqtt分为客户端;服务端。
客户端包括订阅者和发布者; 服务端是指mqtt服务器用来处理发布者发布的消息以及下发给订阅者。
mqtt的工作原理:
发布者发布主题会传到服务器,由服务器下发到订阅该主题的订阅者。并非发布者和订阅者直接对接。
mqtt的介绍可以看这篇文章:https://blog.csdn.net/weixin_40129263/article/details/80983858
介绍的已经很想详细清楚了。
因为只用到了客户端订阅者,服务端以及客户端发布者是由第三方去完成的,所以就讲讲客服端订阅者的使用
以下属性是用来订阅消息的一些相关属性
host:MQTT服务端ip以及端口(服务器的ip,并非发布者的ip)
topIc:消息主题,发布了主题,订阅了主题,发布者和订阅者通过主题对应
clientId:客户端唯一标识
qos:服务质量:(接受消息次数,以及接受消息过程中是否会丢失)
订阅主题:通过该方法订阅主题
订阅发布者发布的主题之后,还需要实现MqttCallback 接口类;这样就可以接收到发布者发布的主题了,如何处理接收到的主题信息呢
实现MqttCallback 的 messageArrived方法即可(该方法就是处理消息主题的,有两个参数String topic, MqttMessage message,message就是存储数据的参数,所需要的信息基本都在这里)
相应的,我们的业务也就需要在该方法里处理。就不再过多说明了 。
package com.rzkj.mqtt.mqtt;
import com.rzkj.mqtt.model.baseBed;
import com.rzkj.mqtt.model.CallDevice;
import com.rzkj.mqtt.model.SaveBed;
import com.rzkj.mqtt.model.SysDept;
import com.rzkj.mqtt.model.vo.SaveBedVo;
import com.rzkj.mqtt.service.IbaseBedService;
import com.rzkj.mqtt.service.ICallDeviceService;
import com.rzkj.mqtt.model.vo.DataModel;
import com.rzkj.mqtt.service.ISysDeptService;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import net.sf.json.util.JSONUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.ui.Model;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Component
public class PushCallback implements MqttCallback {
private static Logger logger= LoggerFactory.getLogger(PushCallback.class);
@Autowired
private ICallDeviceService callDeviceService;
@Autowired
private ISysDeptService sysDeptService;
@Autowired
private IbaseBedService baseBedService;
@Autowired
private static PushCallback pushCallback;
@Autowired
private ClientMQTT clientMQTT;
@PostConstruct
public void init(){
pushCallback = this;
pushCallback.clientMQTT=this.clientMQTT;
pushCallback.callDeviceService=this.callDeviceService;
pushCallback.sysDeptService=this.sysDeptService;
}
public void connectionLost(Throwable cause) {
logger.info("MQTT连接断开了,准备进行重连");
while (true){
logger.info("正在尝试重连");
try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
for(int i=0; i<20; i++){
if ( Thread.currentThread().interrupted() ) {
break;
}
}
Thread.sleep(10000);//10秒
//当设置 options.setAutomaticReconnect(true);不需要写重连方法,设置false需要写重连方法
//pushCallback.clientMQTT.start();
logger.info("MQTT:连接成功");
break;
}catch (Exception e){
e.printStackTrace();
}
}
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
//业务处理
public void messageArrived(String topic, MqttMessage message) throws Exception {
List saveBedList=new ArrayList<>();
if (!"close".equals(new String(message.getPayload()))&& !"null".equals(message.getPayload())){
CallDevice callDevice=new CallDevice();
String record=new String(message.getPayload(),"GBK");//字符串信息
System.out.println("接收消息内容 : " + record);
callDevice.setRecord(record);
JSonObject fullData= JSONObject.fromObject(record);//完整json信息
DataModel dataModel=(DataModel)JSONObject.toBean(fullData, DataModel.class);//接受到的完整对象信息
if (dataModel!=null){
//System.out.println("接收消息内容 : " + dataModel);
callDevice.setCmd(dataModel.getCmd());
callDevice.setDeviceId(dataModel.getDeviceId());
callDevice.setDeviceModel(dataModel.getDeviceModel());
callDevice.setOrgId(dataModel.getOrgId());
callDevice.setProducedTime(dataModel.getProducedTime());
callDevice.setSerialNo(dataModel.getSerialNo());
callDevice.setVersion(dataModel.getVersion());
JSonObject data= fullData.getJSonObject("data");//数组信息
boolean bedIsNull = JSONUtils.isNull( data.get("callRoomList"));
if (!bedIsNull){
String callingBedList= data.getJSonArray("callingBedList").toString();//正在呼叫列表
callDevice.setCallingBedList(callingBedList);
}
boolean roomIsNull = JSONUtils.isNull( data.get("callRoomList"));
if (!roomIsNull){
String callRoomList= data.getJSonArray("callRoomList").toString();//门口振铃列表
callDevice.setCallRoomList(callRoomList);
}
boolean toiletIsNull = JSONUtils.isNull( data.get("callToiletList"));
if (!toiletIsNull){
String callToiletList= data.getJSonArray("callToiletList").toString();//卫生间振铃列表
callDevice.setCallToiletList(callToiletList);
}
boolean saveBedIsNull = JSONUtils.isNull( data.get("saveBedList"));
// if (!saveBedIsNull){
// String saveBed= data.getJSonArray("saveBedList").toString();//床头存储列表
// List saveBedVoList= (List)JSONArray.toList(JSONArray.fromObject(saveBed), SaveBedVo.class);//床头/卫生间信息
// for (SaveBedVo saveBedVo:saveBedVoList){
// ListsysDeptList= pushCallback.sysDeptService.selectChildrenDeptById(Long.parseLong(saveBedVo.getDeptCode())); //机构下的部门信息
// List baseBedList=pushCallback.baseBedService.selectbaseBedsList(Long.parseLong(saveBedVo.getBedName()));//对应床 位的部门信息
// if (!CollectionUtils.isEmpty(sysDeptList)&&!CollectionUtils.isEmpty(baseBedList)){
// //交集对象
// List bedList = baseBedList.stream().filter(obj -> find(String.valueOf(obj.getDeptId()), sysDeptList)).collect(Collectors.toList());
// if (!CollectionUtils.isEmpty(bedList)){
// SaveBed model=new SaveBed();
// saveBedList.add(model);
// }
//
// }
// }
// JSonArray jsonArray = JSONArray.fromObject(saveBedList);
//
// callDevice.setSaveBedList(jsonArray.toString());//老人ID进行json化
//
// System.out.println("床位信息:"+saveBedVoList);
// }
//pushCallback.callDeviceService.insertCallDevice(callDevice);
}
}
}
}
发布者和订阅者原理相差不多,只不过一个发,一个接。
其实订阅者订阅消息很好处理,就不过多说什么了。写的比较粗糙,欢迎大家留言交流。



