栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

MQTT入门

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

MQTT入门

背景:大脑端突然报mqtt多次链接,不知道啥原因
报错信息:
E/MQTTManager: 连接断开:已断开连接 断开原因: java.net.SocketException: Connection reset
E/MQTTManager: 连接断开:已断开连接 断开原因: java.net.SocketException: Broken pipe
E/MQTTManager: 连接断开:已断开连接 断开原因: java.io.EOFException
解决:
增加重连保护机制,最多连7次
恶补下:
Message Queuing Telemetry(遥测:远程测量) Transport(传输协议),基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,为连接远程设备提供实时可靠的消息服务

MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

(1)Connect。等待与服务器建立连接。
(2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。
(3)Subscribe。等待完成订阅。
(4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。
(5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。

application中监听了所有activity的生命周期,在第一个activity启动时调用mqtt

 registerActivityLifecycleCallbacks(activityLifecycleCallbacks);

int activityAount = 0;

    ActivityLifecycleCallbacks activityLifecycleCallbacks = new ActivityLifecycleCallbacks() {
        @Override
        public void onActivityCreated(@NotNull Activity activity, Bundle savedInstanceState) {
            activityList.add(activity);
            ALog.d( "onActivityCreated",activity.getClass().getSimpleName(),activityList.size());
            Log.e("wy", "onActivityCreated   activity.getClass().getSimpleName: "+activity.getClass().getSimpleName()+" activityList.size: " +activityList.size());
            
        }

        @Override
        public void onActivityStarted(Activity activity) {
            ALog.d( "onActivityStarted: "+activity.getClass().getSimpleName());
            Log.e("wy", "onActivityStarted   activity.getClass().getSimpleName: "+activity.getClass().getSimpleName()+" activityList.size: " +activityList.size());

            if (activityAount == 0) {
                //app回到前台
                if (!debug) {
                    if (mqttManager == null) {
                        mqttManager = MQTTManager.getInstance(instance);
                        mqttManager.setMqConnectLisense(new MQTTManager.MqConnectLisense() {
                            @Override
                            public void onSuccess() {
                                Log.e("wy", "onSuccess: " );
                                mqttManager.publish("{"domain":"video_chat","command":"start"}");
                            }

                            @Override
                            public void onFailure(Throwable t) {
                                ALog.i("连接失败",t);
                                activity.runonUiThread(() -> {
                                    Toast.makeText(activity, t.toString(), Toast.LENGTH_LONG).show();
                                });
                            }
                        });
                    } else {
                        Log.e("wy", "170onSuccess: " );
                        mqttManager.publish("{"domain":"video_chat","command":"start"}");
                    }
                }
                ALog.d( "app启动或回到前台");
            }
            activityAount++;
        }
        }

在MQTTManager中
初始化

HOST = "tcp://" + getIp() + ":1883";
        Log.d(TAG, "HOST = " + HOST);
        String serverURI = HOST; //服务器地址(协议+地址+端口号)
        mqttAndroidClient = new MqttAndroidClient(mContext, serverURI, CLIENTID);
        mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
        mMqttConnectOptions = new MqttConnectOptions();
        mMqttConnectOptions.setCleanSession(false); //设置是否清除缓存
        mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
        mMqttConnectOptions.setKeepAliveInterval(20); //设置心跳包发送间隔,单位:秒
        mMqttConnectOptions.setUserName(USERNAME); //设置用户名
        mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码
 int connectNum=0;

    //订阅主题的回调
    private MqttCallback mqttCallback = new MqttCallback() {

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            Log.i(TAG, "收到消息: " + new String(message.getPayload()));
            Log.e("wy", "收到消息: " + new String(message.getPayload()));
            //收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等
            //response("message arrived");
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {

        }

        @Override
        public void connectionLost(Throwable arg0) {
            Log.e(TAG, "连接断开:"+arg0.getMessage()+" 断开原因: "+arg0.getCause());

            if(connectNum<7){
                connectNum++;
                doClientConnection();//连接断开,重连

            }

        }
    };
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336437.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号