在阿里云上注册了两个心电检测设备:ECG_Monitor(检测ECG信号)和java-connect(java后台服务器,用于处理数据)。
1)ECG_Monitor通过AD8232检测ECG信号,通过ESP8266连接物联网。在阿里云平台通过云产品流转,将检测到的ECG信号发送给java后端处理
2)java服务器接收到数据后,接着发送数据到rabbitmq,供处理线程以后拿取数据进行处理。
1.2、ESP8266连接物联网通过ESP8266连接物联网,现阶段直接将WIFI和MQTT连接信息直接写入ESP8266。
程序流程:
1)开启看门狗,如果连接失败将会无法喂狗,而导致单片机重启
2)首先连接WIFI,连接失败,下次循环继续尝试
3)连接成功后,尝试连接阿里云,连接成功后,读取AD8232检测的数据,封装为json格式,最后发送到阿里云上(因为测试时,每次都得带上检测设备,太过于麻烦,所以每次直接发送0-500)。连接失败后死循环,直到单片机重启。
4)发送数据,因为数据量较大,每次最多发送128字节,每次发送时先探测数据长度,如果大于128,多次发送
详细代码
#include #include#include char* SSID_STA = "HUAWEI-HNY28A"; char* PSD_STA = "0109253333"; #define PRODUCT_KEY "gecpspAbh0V" #define DEVICE_NAME "ECG_Monitor" #define MQTT_SERVER PRODUCT_KEY ".iot-as-mqtt.cn-shanghai.aliyuncs.com" #define MQTT_PORT 1883 #define MQTT_CLIENT_ID DEVICE_NAME "|securemode=3,signmethod=hmacsha1|" #define MQTT_USERNAME DEVICE_NAME "&" PRODUCT_KEY #define MQTT_PASSWORD "4F4FF5400301D60DB0B0FC07B8BA1F6740A8DC28" #define TOPIC "/" PRODUCT_KEY "/" DEVICE_NAME "/user/update" void mqtt_callback(char* topic, byte* payload, unsigned int lengths); void doWIFITick(); const __FlashStringHelper* connectErrorToString(int8_t code); void parseMqttResponse(char* payload); void connect_MQTT(); WiFiClient wifiClient; PubSubClient MqttClient(MQTT_SERVER, MQTT_PORT, &mqtt_callback, wifiClient); void setup() { delay(2000); Serial.begin(115200); Serial.println("."); } void loop() { ESP.wdtFeed(); doWIFITick(); if(WiFi.status() == WL_CONNECTED){ connect_MQTT(); MqttClient.loop(); DynamicJsondocument doc(4096); JsonObject ECG = doc.to (); JsonArray Data = ECG.createNestedArray("DATA"); for(word i=0;i<500;i++){ Data.add(i); } String message; Serial.println(measureJson(doc)); serializeJson(doc, message); publicECG(message); } delay(1000); } void publicECG(String message){ if(!MqttClient.connected())return; static uint8_t is = 0; if(is>5){ return; } int cut = 128; int json_len = message.length(); Serial.print("josn_len: "); Serial.println(json_len); if(json_len > cut){ MqttClient.beginPublish(TOPIC, json_len, true); int count = json_len/cut; for (int i = 0; i <= (count-1); i++){ MqttClient.print(message.substring(i * cut, (1+i)*cut)); Serial.print("message: "); Serial.println(message.substring(i * cut, (1+i)*cut)); } MqttClient.print(message.substring(cut*count, json_len)); Serial.print("last message: "); Serial.println(message.substring(cut*count, json_len)); MqttClient.endPublish(); }else{ MqttClient.publish(TOPIC, message.c_str()); Serial.println(message); } is++; } void connect_MQTT(){ if(MqttClient.connected()){ return; } Serial.println("try to connect MQTT Server"); int8_t ret; uint8_t retrys = 3; while(!MqttClient.connect(MQTT_CLIENT_ID, MQTT_USERNAME, MQTT_PASSWORD)){ Serial.println(connectErrorToString(MqttClient.state())); Serial.println(F("Retrying MQTT connection in 5 seconds...")); MqttClient.disconnect(); delay(5000); retrys--; if(retrys == 0){ while(1); } yield(); } Serial.println(F("Success!")); } void doWIFITick(){ static bool taskStarted = false; static bool startSTAFlag = false; static uint32_t lastWiFitick = 0; if(!startSTAFlag){ startSTAFlag = true; Serial.print("try to connect WIFI: "); Serial.println(SSID_STA); WiFi.disconnect(); WiFi.mode(WIFI_STA); WiFi.begin(SSID_STA, PSD_STA); Serial.printf("ESP has free heap: %drn", ESP.getFreeHeap()); } if(WiFi.status() != WL_CONNECTED){ if(millis() - lastWiFitick > 1000){ lastWiFitick = millis(); Serial.print("."); } }else{ if(!taskStarted){ taskStarted = true; Serial.print("rnlocal IP: "); Serial.println(WiFi.localIP()); } } } void mqtt_callback(char* topic, byte* payload, unsigned int lengths){ byte* ends = payload + lengths; for(byte* p = payload; p <= ends; p++){ Serial.print(*((char*)p)); } Serial.println(""); parseMqttResponse((char*)payload); } void parseMqttResponse(char* payload){ Serial.println("start parse MqttResponse"); StaticJsondocument<100> doc; deserializeJson(doc, payload); serializeJson(doc, Serial); } const __FlashStringHelper* connectErrorToString(int8_t code) { switch (code) { case 1: return F("The Server does not support the level of the MQTT protocol requested"); case 2: return F("The Client identifier is correct UTF-8 but not allowed by the Server"); case 3: return F("The MQTT service is unavailable"); case 4: return F("The data in the user name or password is malformed"); case 5: return F("Not authorized to connect"); case 6: return F("Exceeded reconnect rate limit. Please try again later."); case 7: return F("You have been banned from connecting. Please contact the MQTT server administrator for more details."); case -1: return F("Connection failed"); case -2: return F("Failed to subscribe"); case -3: return F("Connection Lost"); case -4: return F("Connection Timeout"); default: return F("Unknown error"); } }
阿里云的结果:
2.java服务器 2.1、建立与阿里云的连接这部分程序设计分为:MqttUtil(连接阿里云的工具类),MqttSign(根据productKey等,计算连接阿里云的各个数据信息,如用户名,密码等)和MacUtil(因为连接阿里云需要加密密码,实现MAC加密算法的工具类)
1)MqttUtil详细代码
使用Mqtt5协议,连接阿里云,简单的发布和订阅功能,在接受到订阅后,通过Producer类将数据发送到rabbitmq
public class MqttUtil {
private String MqttServer;
private String password;
private String userName;
private String clientID;
private MqttClient client;
public MqttUtil(String MqttServer, String password, String userName, String clientID) throws MqttException {
this.MqttServer = MqttServer;
this.password = password;
this.userName = userName;
this.clientID = clientID;
client = new MqttClient(MqttServer, clientID, new MemoryPersistence());
}
public void connect() throws MqttException {
MqttConnectionOptions conOption = new MqttConnectionOptions();
conOption.setKeepAliveInterval(180);
conOption.setUserName(userName);
conOption.setPassword(password.getBytes(StandardCharsets.UTF_8));
conOption.setCleanStart(true);
client.connect(conOption);
}
public void publish(String topic, MqttMessage message) throws MqttException {
if(!client.isConnected()){
return;
}
message.setQos(0);
client.publish(topic, message);
}
public void subscribe(MqttSubscription topic,String QUEUE_NAME, String Host, String UserName, String Password) throws MqttException {
client.subscribe(new MqttSubscription[]{topic},
new IMqttMessageListener[]{new MqttPostPropertyMessageListener(QUEUE_NAME, Host, UserName, Password)});
}
static class MqttPostPropertyMessageListener implements IMqttMessageListener{
private Producer producer;
public MqttPostPropertyMessageListener(String QUEUE_NAME, String Host, String UserName, String Password){
producer = new Producer(QUEUE_NAME,Host,UserName,Password);
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws IOException, TimeoutException {
producer.sentMessage(mqttMessage.getPayload());
}
}
}
2)MqttSign详细代码
重点在于calculate方法,拼装出userName,ClientID,利于MacUtil加密明码,这些数据将用于连接阿里云
public class MqttSign {
private String productKey;
private String deviceName;
private String deviceSecret;
private String userName;
private String userPassword;
private String clientID;
public MqttSign(String productKey, String deviceName, String deviceSecret){
this.productKey = productKey;
this.deviceName = deviceName;
this.deviceSecret = deviceSecret;
}
public void calculate() throws NoSuchAlgorithmException, InvalidKeyException {
userName = deviceName+"&"+productKey;
String timestamp = Long.toString(System.currentTimeMillis());
clientID = productKey + "." + deviceName + "|" + "timestamp=" + timestamp +
",_v=paho-java-1.0.0,securemode=2,signmethod=hmacsha1|";
String plainPasswd = "clientId" + productKey + "." + deviceName + "deviceName" +
deviceName + "productKey" + productKey + "timestamp" + timestamp;
MacUtil macUtil = new MacUtil("HmacSHA1", plainPasswd, "%040x");
macUtil.Init(deviceSecret);
userPassword = macUtil.fingerPrint();
}
public void setDeviceName(String deviceName) {
this.deviceName = deviceName;
}
public void setDeviceSecret(String deviceSecret) {
this.deviceSecret = deviceSecret;
}
public void setProductKey(String productKey) {
this.productKey = productKey;
}
public String getUserName() {
return userName;
}
public String getClientID() {
return clientID;
}
public String getUserPassword() {
return userPassword;
}
}
3)MacUtil详细代码
利于java的JCE框架搭建MAC加密工具类
public class MacUtil {
private Key key;
private Mac mac;
private KeyGenerator keyGenerator;
private boolean isInit = false;
private String message;
private byte[] bytes;
private String algorithm;
private String format;
public MacUtil(String algorithm, String message, String format){
this(algorithm, message.getBytes(StandardCharsets.UTF_8), format);
}
public MacUtil(String algorithm, byte[] bytes, String format){
this.algorithm = algorithm;
this.bytes = bytes;
this.format = format;
}
public void Init(String secret_key, int i) throws NoSuchAlgorithmException, InvalidKeyException {
keyGenerator = KeyGenerator.getInstance(algorithm);
SecureRandom secureRandom = SecureRandom.getInstance("SHA1PRNG");
secureRandom.setSeed(secret_key.getBytes(StandardCharsets.UTF_8));
keyGenerator.init(secureRandom);
key = keyGenerator.generateKey();
mac = Mac.getInstance(algorithm);
mac.init(key);
}
public void Init(String secret_key) {
try {
mac = Mac.getInstance(algorithm);
SecretKeySpec secretKeySpec = new SecretKeySpec(secret_key.getBytes(), algorithm);
mac.init(secretKeySpec);
}catch (Exception e){
e.printStackTrace();
}
}
public String fingerPrint(){
byte[] outPut = mac.doFinal(bytes);
return String.format(format, new BigInteger(1, outPut));
}
}
2.2、将数据发送到rabbitmq
现阶段还未实现对数据的处理功能,只是将转发到java服务器的数据存储到rabbitmq中
这部分的代码主要分为Producer(将数据转发到消息队列中),Consumer(取出消息队列中的数据)
1)Producer
连接rabbitmq,建立非持久化队列,发送数据
public class Producer {
private String QUEUE_NAME;
private String Host;
private String UserName;
private String PassWord;
private MqUtil mqUtil;
public Producer(String QUEUE_NAME, String Host, String UserName, String Password){
this.QUEUE_NAME = QUEUE_NAME;
this.Host = Host;
this.UserName = UserName;
this.PassWord = Password;
mqUtil = new MqUtil(QUEUE_NAME,Host,UserName,Password);
}
public void sentMessage(String message) throws IOException, TimeoutException {
sentMessage(message.getBytes(StandardCharsets.UTF_8));
}
public void sentMessage(byte[] content) throws IOException, TimeoutException {
Connection connection = mqUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false,false,false,null);
channel.basicPublish("",QUEUE_NAME,null,content);
}
public void setUserName(String userName) {
UserName = userName;
}
public void setQUEUE_NAME(String QUEUE_NAME) {
this.QUEUE_NAME = QUEUE_NAME;
}
public void setPassWord(String passWord) {
PassWord = passWord;
}
public void setHost(String host) {
Host = host;
}
}
2)Consumer
建立与rabbitmq的连接,拿去消息并打印
public class Consumer implements Runnable {
private String QUEUE_NAME;
private String Host;
private String UserName;
private String PassWord;
private final MqUtil mqUtil;
public Consumer(String QUEUE_NAME, String Host, String UserName, String Password){
this.QUEUE_NAME = QUEUE_NAME;
this.Host = Host;
this.PassWord = Password;
this.UserName = UserName;
mqUtil = new MqUtil(QUEUE_NAME, Host, UserName, Password);
}
@Override
public void run() {
try {
Connection connection = mqUtil.getConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println(consumerTag+" get message false");
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
public void setHost(String host) {
Host = host;
}
public void setPassWord(String passWord) {
PassWord = passWord;
}
public void setQUEUE_NAME(String QUEUE_NAME) {
this.QUEUE_NAME = QUEUE_NAME;
}
public void setUserName(String userName) {
UserName = userName;
}
}
2.3、结果
rabbitmq接受数据:
从消息队列获取数据:



