栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

基于Netty实现Mqtt客户端(三)-创建连接

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于Netty实现Mqtt客户端(三)-创建连接

如何创建mqtt连接

要建立一个mqtt连接,分两个步骤:

  • 创建socket长连接
  • 发送连接报文(mqtt协议)
创建长连接
private void doConnect(MqttConnectOptions options, long timeout) throws Exception {
	// 创建连接
	EventLoopGroup group = new NioEventLoopGroup();
	connectTask = new AsyncTask() {
	    @Override
	    public String call() throws Exception {
	        Bootstrap b = new Bootstrap()
	                .group(group)
	                .channel(NioSocketChannel.class)
	                .handler(new ChannelInitializer() {
	                    @Override
	                    protected void initChannel(SocketChannel channel) throws Exception {
	                        channel.pipeline()
	                                .addLast("decoder", new MqttDecoder())//解码
	                                .addLast("encoder", MqttEncoder.INSTANCE)//编码
	                                .addLast("handler", new MqttHandler());
	                    }
	                });
	        ChannelFuture ch = b.connect(options.getHost(), options.getPort()).sync();
	        channel = ch.channel();
	        Log.i("--已连接->" + channel.localAddress().toString());
	        return null;
	    }
	}.execute();
	try {
	    connectTask.get(timeout, TimeUnit.MILLISECONDS);
	} catch (Exception e) {
	    Log.i("-->连接异常:" + e);
	    group.shutdownGracefully();
	    throw e;
	}
	
	if (channel == null)
	    return;
	
	// 发送mqtt协议连接报文
	doConnect0(channel, options, timeout);
	
	// 等待连接关闭的任务
	connectTask = new AsyncTask() {
	    @Override
	    public String call() throws Exception {
	        try {
	            channel.closeFuture().sync();
	        } catch (Exception e) {
	            Log.i("-->连接断开异常:" + e);
	        } finally {
	            group.shutdownGracefully();
	            if (!isClosed()) {
	                // 非主动断开,可能源于服务器原因
	                Exception e = new ConnectException("Connection closed unexpectedly");
	                Log.i("-->连接断开:" + e);
	                onConnectLost(e);
	            } else {
	                Log.i("-->连接断开:主动");
	            }
	        }
	        return null;
	    }
	}.execute();
}

注:AsyncTask是封装的异步任务类,类似FutureTask

发送连接报文
private void doConnect0(Channel channel, MqttConnectOptions options, long timeout) throws Exception {
    if (channel == null)
        return;

    try {
        connectProcessor = new ConnectProcessor();
        String s = connectProcessor.connect(channel, options, timeout);
        if (ProcessorResult.RESULT_SUCCESS.equals(s)) {
            Log.i("-->连接成功");
        } else {
            throw new CancellationException();
        }
    } catch (Exception e) {
        if (e instanceof CancellationException) {
            Log.i("-->连接取消");
        } else {
            Log.i("-->连接异常:" + e);
            throw e;
        }
    }
}

connectProcessor.java

package io.x2ge.mqtt.core;

import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.x2ge.mqtt.MqttConnectOptions;
import io.x2ge.mqtt.utils.AsyncTask;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class ConnectProcessor extends AsyncTask {

    private final AtomicBoolean receivedAck = new AtomicBoolean(false);
    private Exception e;

    @Override
    public String call() throws Exception {
        while (!isCancelled() && !receivedAck.get()) {
            synchronized (receivedAck) {
                if (e != null) {
                    throw e;
                }
                try {
                    receivedAck.wait(300L);
                } catch (Exception ex) {
//                    ex.printStackTrace();
                }
            }
        }
        return receivedAck.get() ? ProcessorResult.RESULT_SUCCESS : ProcessorResult.RESULT_FAIL;
    }

    public String connect(Channel channel, MqttConnectOptions options, long timeout) throws Exception {
        channel.writeAndFlush(ProtocolUtils.connectMessage(options));
        return execute().get(timeout, TimeUnit.MILLISECONDS);
    }

    public void processAck(Channel channel, MqttConnAckMessage msg) {
        MqttConnAckVariableHeader mqttConnAckVariableHeader = msg.variableHeader();
        String errormsg = "";
        switch (mqttConnAckVariableHeader.connectReturnCode()) {
            case CONNECTION_ACCEPTED:
                synchronized (receivedAck) {
                    receivedAck.set(true);
                    receivedAck.notify();
                }
                return;
            case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
                errormsg = "用户名密码错误";
                break;
            case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
                errormsg = "clientId不允许链接";
                break;
            case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
                errormsg = "服务不可用";
                break;
            case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
                errormsg = "mqtt 版本不可用";
                break;
            case CONNECTION_REFUSED_NOT_AUTHORIZED:
                errormsg = "未授权登录";
                break;
            default:
                errormsg = "未知问题";
                break;
        }

        synchronized (receivedAck) {
            e = new IOException(errormsg);
            receivedAck.notify();
        }
    }
}

生成连接报文:

public static MqttConnectMessage connectMessage(MqttConnectOptions options) {
    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
            false,
            MqttQoS.AT_MOST_ONCE,
            false,
            10);
    MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
            options.getMqttVersion().protocolName(),
            options.getMqttVersion().protocolLevel(),
            options.isHasUserName(),
            options.isHasPassword(),
            options.isWillRetain(),
            options.getWillQos(),
            options.isWillFlag(),
            options.isCleanSession(),
            options.getKeepAliveTime());
    MqttConnectPayload payload = new MqttConnectPayload(
            options.getClientIdentifier(),
            options.getWillTopic(),
            options.getWillMessage(),
            options.getUserName(),
            options.getPassword());
    return new MqttConnectMessage(fixedHeader, variableHeader, payload);
}

在自定义的MqttHandler中监听连接报文的应答:

class MqttHandler extends SimpleChannelInboundHandler {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msgx) throws Exception {
        if (msgx == null) {
            return;
        }

        switch (mqttFixedHeader.messageType()) {
            case CONNACK:
            	// 连接报文的响应
                if (connectProcessor != null)
                    connectProcessor.processAck(ctx.channel(), (MqttConnAckMessage) msg);
                break;
            case SUBACK:
            	// 订阅报文的响应
                break;
            case UNSUBACK:
            	// 取消订阅报文的响应
                break;
            case PUBLISH:
            	 // 收到消息报文
                break;
            case PUBACK:
            	// 发布消息报文响应
                // qos = 1的发布才有该回应
                break;
            case PUBREC:
                // qos = 2的发布才参与
                break;
            case PUBREL:
                // qos = 2的发布才参与
                break;
            case PUBCOMP:
                // qos = 2的发布才参与
                break;
            case PINGRESP:
            	// ping报文响应
                break;
            default:
                break;
        }
    }
}
 
连接报文内容 

客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT报文。
在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接 。
报文包含固定报头、可变报头、有效载荷三部分。有效载荷包含一个或多个编码的字段。包括客户端的唯一标识符,Will主题,Will消息,用户名和密码。除了客户端标识之外,其它的字段都是可选的,基于标志位来决定可变报头中是否需要包含这些字段。
注:完整的报文内容,此处不做赘述,请翻阅mqtt协议书以做了解。

项目源码

netty-mqtt-client

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号