栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

对paho.MQTT遇到的问题处理两记

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

对paho.MQTT遇到的问题处理两记

文章目录
  • EMQX
    • 安装Broker
  • Python编写
    • 下载
    • 代码
  • Android配置编写及问题解决
    • 1. 根目录下
    • 2. app目录下
    • 3. AndroidManifest.xml 添加
    • 4. MainActivity.java


EMQX 安装Broker
docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:4.3.8

基于原项目修改
https://github.com/eclipse/paho.mqtt.python
https://github.com/eclipse/paho.mqtt.android

Python编写 下载
pip install paho-mqtt

若过程中出现以下问题,需升级pip

Building wheels for collected packages: paho-mqtt
  Building wheel for paho-mqtt (setup.py) ... error
  ERROR: Command errored out with exit status 1:
   command: /root/django/venv_RTCS/bin/python -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-req-build-yt56ry83/setup.py'"'"'; __file__='"'"'/tmp/pip-req-build-yt56ry83/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'rn'"'"', '"'"'n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' bdist_wheel -d /tmp/pip-wheel-0vih1nvj
       cwd: /tmp/pip-req-build-yt56ry83/
  Complete output (6 lines):
  usage: setup.py [global_opts] cmd1 [cmd1_opts] [cmd2 [cmd2_opts] ...]
     or: setup.py --help [cmd1 cmd2 ...]
     or: setup.py --help-commands
     or: setup.py cmd --help
  error: invalid command 'bdist_wheel'
  ERROR: Failed building wheel for paho-mqtt
  Running setup.py clean for paho-mqtt
Failed to build paho-mqtt
  • 升级pip
python -m pip install --upgrade pip
代码
#!/usr/bin/python
# coding:utf-8

import paho.mqtt.client as mqtt
import json

HOST = ""
PORT = 1883
user = "test"
pw = "test"
client_id = "test"

class Project_Mqtt:
    # 建立连接
    def do_connect(self, HOST, user, pw, client_id):
        self.client = mqtt.Client(client_id)
        self.client.username_pw_set(user, pw)
        self.client.connect(HOST, PORT, 60)
	
	# 处理订阅消息
    def deal_on_message(self, client, userdata, msg):
        print(msg.topic + " " + msg.payload.decode("utf-8"))
        messages = json.loads(msg.payload)

    # 订阅
    def do_subscribe(self, topic, qos=0):
        self.client.subscribe(topic, qos=qos)
        self.client.on_message = self.deal_on_message
        self.client.loop_forever()

    # 发布
    def do_publish(self, topic, message, qos=0):
        self.client.publish(topic, message, qos=qos, retain=False)

    # 停止
    def do_stop(self):
        self.client.loop_stop()
Android配置编写及问题解决

对高版本的SDK的一种暂时的可适用方法。

  • 解决问题:localbroadcastmanager未定义(高版本API已不支持),参考Github issue,引用第三方包但无效。故须在根目录下配置gradle.properties
1. 根目录下
  • gradle.properties添加
    android.enableJetifier=true
    
2. app目录下
  • build.gradle添加
    implementation 'com.android.support:support-v4:30+'
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    implementation 'androidx.localbroadcastmanager:localbroadcastmanager:1.0.0'
    
  • build.gradle修改SDK31 为 SDK30
  • build.gradle 同步
3. AndroidManifest.xml 添加

    
    
    

    
    
     

4. MainActivity.java

serverUri 须为【主机的ip】

package com.example.mqtta;

import android.widget.Toast;
import androidx.appcompat.app.AppCompatActivity;
import android.os.Bundle;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;


public class MainActivity extends AppCompatActivity {
    MqttAndroidClient mqttAndroidClient;

    // final String serverUri = "tcp://10.96.131.119:1883";

    String clientId = "test";
    final String subscriptionTopic = "test";
    final String publishTopic = "test";
    final String publishMessage = "Hello World!";
    final String name = "test";
    final String password = "test";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        clientId = clientId + System.currentTimeMillis();
		
        mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId);
        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                publishMessage();
//                if (reconnect) {
//                    addToHistory("Reconnected to : " + serverURI);
//                    // Because Clean Session is true, we need to re-subscribe
//                    subscribeToTopic();
//                } else {
//                    addToHistory("Connected to: " + serverURI);
//                }
            }

            @Override
            public void connectionLost(Throwable cause) {
            }
			// 处理订阅消息
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                Toast.makeText(MainActivity.this, new String(message.getPayload()), Toast.LENGTH_LONG).show();
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });
        
		// 连接设置
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setUserName(name);
        mqttConnectOptions.setPassword(password.toCharArray());

        try {
            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
//                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
//                    disconnectedBufferOptions.setBufferEnabled(true);
//                    disconnectedBufferOptions.setBufferSize(100);
//                    disconnectedBufferOptions.setPersistBuffer(false);
//                    disconnectedBufferOptions.setDeleteOldestMessages(false);
//                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_LONG).show();
                    subscribeToTopic();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Toast.makeText(MainActivity.this, "连接失败", Toast.LENGTH_LONG).show();
//                    addToHistory("Failed to connect to: " + serverUri);
                }
            });


        } catch (MqttException ex){
            ex.printStackTrace();
        }
    }
	// 订阅
    public void subscribeToTopic(){
        try {
            mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Toast.makeText(MainActivity.this, "订阅成功", Toast.LENGTH_SHORT).show();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                }
            });
//
//            // THIS DOES NOT WORK!
//            mqttAndroidClient.subscribe(subscriptionTopic, 0, new IMqttMessageListener() {
//                @Override
//                public void messageArrived(String topic, MqttMessage message) throws Exception {
//                    // message Arrived!
//                    System.out.println("Message: " + topic + " : " + new String(message.getPayload()));
//                }
//            });

        } catch (MqttException ex){
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }
    }
		// 发布
        public void publishMessage(){

            try {
                MqttMessage message = new MqttMessage();
                message.setPayload(publishMessage.getBytes());
                mqttAndroidClient.publish(publishTopic, message);
                Toast.makeText(MainActivity.this, "Message Published", Toast.LENGTH_SHORT).show();

                if(!mqttAndroidClient.isConnected()){
                    Toast.makeText(MainActivity.this, "Lost of Connected", Toast.LENGTH_SHORT).show();
                }
            } catch (MqttException e) {
                System.err.println("Error Publishing: " + e.getMessage());
                e.printStackTrace();
            }
        }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/349353.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号