要建立一个mqtt连接,分两个步骤:
- 创建socket长连接
- 发送连接报文(mqtt协议)
private void doConnect(MqttConnectOptions options, long timeout) throws Exception {
// 创建连接
EventLoopGroup group = new NioEventLoopGroup();
connectTask = new AsyncTask() {
@Override
public String call() throws Exception {
Bootstrap b = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("decoder", new MqttDecoder())//解码
.addLast("encoder", MqttEncoder.INSTANCE)//编码
.addLast("handler", new MqttHandler());
}
});
ChannelFuture ch = b.connect(options.getHost(), options.getPort()).sync();
channel = ch.channel();
Log.i("--已连接->" + channel.localAddress().toString());
return null;
}
}.execute();
try {
connectTask.get(timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
Log.i("-->连接异常:" + e);
group.shutdownGracefully();
throw e;
}
if (channel == null)
return;
// 发送mqtt协议连接报文
doConnect0(channel, options, timeout);
// 等待连接关闭的任务
connectTask = new AsyncTask() {
@Override
public String call() throws Exception {
try {
channel.closeFuture().sync();
} catch (Exception e) {
Log.i("-->连接断开异常:" + e);
} finally {
group.shutdownGracefully();
if (!isClosed()) {
// 非主动断开,可能源于服务器原因
Exception e = new ConnectException("Connection closed unexpectedly");
Log.i("-->连接断开:" + e);
onConnectLost(e);
} else {
Log.i("-->连接断开:主动");
}
}
return null;
}
}.execute();
}
注:AsyncTask是封装的异步任务类,类似FutureTask
发送连接报文private void doConnect0(Channel channel, MqttConnectOptions options, long timeout) throws Exception {
if (channel == null)
return;
try {
connectProcessor = new ConnectProcessor();
String s = connectProcessor.connect(channel, options, timeout);
if (ProcessorResult.RESULT_SUCCESS.equals(s)) {
Log.i("-->连接成功");
} else {
throw new CancellationException();
}
} catch (Exception e) {
if (e instanceof CancellationException) {
Log.i("-->连接取消");
} else {
Log.i("-->连接异常:" + e);
throw e;
}
}
}
connectProcessor.java
package io.x2ge.mqtt.core; import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; import io.x2ge.mqtt.MqttConnectOptions; import io.x2ge.mqtt.utils.AsyncTask; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class ConnectProcessor extends AsyncTask{ private final AtomicBoolean receivedAck = new AtomicBoolean(false); private Exception e; @Override public String call() throws Exception { while (!isCancelled() && !receivedAck.get()) { synchronized (receivedAck) { if (e != null) { throw e; } try { receivedAck.wait(300L); } catch (Exception ex) { // ex.printStackTrace(); } } } return receivedAck.get() ? ProcessorResult.RESULT_SUCCESS : ProcessorResult.RESULT_FAIL; } public String connect(Channel channel, MqttConnectOptions options, long timeout) throws Exception { channel.writeAndFlush(ProtocolUtils.connectMessage(options)); return execute().get(timeout, TimeUnit.MILLISECONDS); } public void processAck(Channel channel, MqttConnAckMessage msg) { MqttConnAckVariableHeader mqttConnAckVariableHeader = msg.variableHeader(); String errormsg = ""; switch (mqttConnAckVariableHeader.connectReturnCode()) { case CONNECTION_ACCEPTED: synchronized (receivedAck) { receivedAck.set(true); receivedAck.notify(); } return; case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: errormsg = "用户名密码错误"; break; case CONNECTION_REFUSED_IDENTIFIER_REJECTED: errormsg = "clientId不允许链接"; break; case CONNECTION_REFUSED_SERVER_UNAVAILABLE: errormsg = "服务不可用"; break; case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION: errormsg = "mqtt 版本不可用"; break; case CONNECTION_REFUSED_NOT_AUTHORIZED: errormsg = "未授权登录"; break; default: errormsg = "未知问题"; break; } synchronized (receivedAck) { e = new IOException(errormsg); receivedAck.notify(); } } }
生成连接报文:
public static MqttConnectMessage connectMessage(MqttConnectOptions options) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
MqttQoS.AT_MOST_ONCE,
false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
options.getMqttVersion().protocolName(),
options.getMqttVersion().protocolLevel(),
options.isHasUserName(),
options.isHasPassword(),
options.isWillRetain(),
options.getWillQos(),
options.isWillFlag(),
options.isCleanSession(),
options.getKeepAliveTime());
MqttConnectPayload payload = new MqttConnectPayload(
options.getClientIdentifier(),
options.getWillTopic(),
options.getWillMessage(),
options.getUserName(),
options.getPassword());
return new MqttConnectMessage(fixedHeader, variableHeader, payload);
}
在自定义的MqttHandler中监听连接报文的应答:
class MqttHandler extends SimpleChannelInboundHandler连接报文内容
客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT报文。
在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接 。
报文包含固定报头、可变报头、有效载荷三部分。有效载荷包含一个或多个编码的字段。包括客户端的唯一标识符,Will主题,Will消息,用户名和密码。除了客户端标识之外,其它的字段都是可选的,基于标志位来决定可变报头中是否需要包含这些字段。
注:完整的报文内容,此处不做赘述,请翻阅mqtt协议书以做了解。
netty-mqtt-client



