3.3.5 私有协议开发
3.3.5.1 私有协议介绍3.3.5.2 网络拓扑图3.3.5.3 功能栈描述3.3.5.4 基本搭建3.3.5.5 编解码3.3.5.6 握手与安全认证3.3.5.7 心跳检测3.3.5.8 服务端,客服端
3.3.5 私有协议开发 3.3.5.1 私有协议介绍私有协议本质上是厂商内部发展和采用的标准,除非授权,其他厂商一般无权使用该协议。私有协议也称非标准协议,就是未经国际或国家标准化组织采纳或批准,由某个企业自己制订,协议实现细节不愿公开,只在企业自己生产的设备之间使用的协议。私有协议具有封闭性、垄断性、排他性等特点。如果网上大量存在私有(非标准)协议,现行网络或用户一旦使用了它,后进入的厂家设备就必须跟着使用这种非标准协议,才能够互连互通,否则根本不可能进入现行网络。这样,使用非标准协议的厂家就实现了垄断市场的愿望。尽管私有协议具有垄断性的特征,但并非所有的私有协议设计者的初衷就是为了垄断。由于现代软件系统的复杂性,一个大型软件系统往往会被人为地拆分成多个模块,另外随着移动互联网的兴起,网站的规模也越来越大,业务的功能越来越多,为了能够支撑业务的发展,往往需要集群和分布式部署,这样,各个模块之间就要进行跨节点通信。跨节点的远程服务调用,除了链路层的物理连接外,还需要对请求和响应消息进行编解码。在请求和应答消息本身以外,也需要携带一些其他控制和管理类指令,例如链路建立的握手请求和响应消息、链路检测的心跳消息等。当这些功能组合到一起之后,就会形成私有协议。事实上,私有协议并没有标准的定义,只要是能够用于跨进程、跨主机数据交换的非标准协议,都可以称为私有协议。通常情况下,正规的私有协议都有具体的协议规范文档,类似于《XXXX协议VXX规范》,但是在实际的项目中,内部使用的私有协议往往是口头约定的规范,由于并不需要对外呈现或者被外部调用,所以一般不会单独写相关的内部私有协议规范文档。 3.3.5.2 网络拓扑图
在分布式组网环境下,每个Netty节点 (Netty进程)之间建立长连接,使用Netty协议进行通信。Netty节点并没有服务端和客户端的区分,谁首先发起连接,谁就作为客户端,另一方自然就成为服务端。一个Netty节点既可以作为客户端连接另外的 Netty节点,也可以作为Netty服务端被其他Netty节点连接,这完全取决于使用者的业务场景和具体的业务组网。
Netty协议栈承载了业务内部各模块之间的消息交互和服务调用,它的主要功能如下。(1)基于Netty 的NIO通信框架,提供高性能的异步通信能力。(2)提供消息的编解码框架,可以实现POJO的序列化和反序列化。(3)提供基于IP地址的白名单接入认证机制。(4)链路的有效性校验机制。(5)链路的断连重连机制。
注意:需要指出的是,Netty协议通信双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息给对方,通信方式可以是 TWOWAY或者ONE WAY。双方之间的心跳采用Ping-Pong机制,当链路处于空闲状态时,客户端主动发送Ping消息给服务端,服务端接收到Ping消息后发送应答消息 Pong给客户端,如果客户端连续发送N条Ping消息都没有接收到服务端返回的Pong消息,说明链路已经挂死或者对方处于异常状态,客户端主动关闭连接,间隔周期T后发起重连操作,直到重连成功。
3.3.5.4 基本搭建定义消息实体类
package com.shu.Pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class NettyMessage {
private Head header; // 头部
private Object body; // 消息正文
}
package com.shu.Pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Head {
// 1010 1011 1101 1110 0000 0001 0000 0001
private int crcCode = 0xabef0101; // 固定值,表名这是Netty消息 4字节
private int length;// 消息长度
private long sessionID;// 会话ID
private byte type;// 消息类型
private byte priority;// 消息优先级
private Map attachment = new HashMap(); // 扩展消息头
}
序列化器
package com.shu.Factory;
import org.jboss.marshalling.*;
import java.io.IOException;
public class MarshallingCodecFactory {
protected static Marshaller buildMarshalling() throws IOException {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
Marshaller marshaller = marshallerFactory
.createMarshaller(configuration);
return marshaller;
}
protected static Unmarshaller buildUnMarshalling() throws IOException {
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
final Unmarshaller unmarshaller = marshallerFactory
.createUnmarshaller(configuration);
return unmarshaller;
}
}
package com.shu.Factory;
import com.shu.MyByteOutput.ChannelBufferByteInput;
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteInput;
import org.jboss.marshalling.Unmarshaller;
import java.io.IOException;
public class MarshallingDecoder {
private final Unmarshaller unmarshaller;
public MarshallingDecoder() throws IOException {
unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
}
protected Object decode(ByteBuf in) throws Exception {
//1. 读取第一个4bytes,里面放置的是object对象的byte长度
int objectSize = in.readInt();
ByteBuf buf = in.slice(in.readerIndex(), objectSize);
//2 . 使用bytebuf的代理类
ByteInput input = new ChannelBufferByteInput(buf);
try {
//3. 开始解码
unmarshaller.start(input);
Object obj = unmarshaller.readObject();
unmarshaller.finish();
//4. 读完之后设置读取的位置
in.readerIndex(in.readerIndex() + objectSize);
return obj;
} finally {
unmarshaller.close();
}
}
}
package com.shu.Factory;
import org.jboss.marshalling.*;
import java.io.IOException;
public class MarshallingCodecFactory {
protected static Marshaller buildMarshalling() throws IOException {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
Marshaller marshaller = marshallerFactory
.createMarshaller(configuration);
return marshaller;
}
protected static Unmarshaller buildUnMarshalling() throws IOException {
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
final Unmarshaller unmarshaller = marshallerFactory
.createUnmarshaller(configuration);
return unmarshaller;
}
}
3.3.5.5 编解码
编码
package com.shu.MyByteOutput; import com.shu.Factory.MarshallingEncoder; import com.shu.Pojo.NettyMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.io.IOException; import java.util.Map; public class NettyMessageEncoder extends MessageToByteEncoder{ MarshallingEncoder marshallingEncoder; public NettyMessageEncoder() throws IOException { this.marshallingEncoder = new MarshallingEncoder(); } @Override protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception { if (null == msg || null == msg.getHeader()) { throw new Exception("The encode message is null"); } //---写入crcCode--- sendBuf.writeInt((msg.getHeader().getCrcCode())); //---写入length--- sendBuf.writeInt((msg.getHeader().getLength())); //---写入sessionId--- sendBuf.writeLong((msg.getHeader().getSessionID())); //---写入type--- sendBuf.writeByte((msg.getHeader().getType())); //---写入priority--- sendBuf.writeByte((msg.getHeader().getPriority())); //---写入附件大小--- sendBuf.writeInt((msg.getHeader().getAttachment().size())); String key = null; byte[] keyArray = null; Object value = null; for (Map.Entry param : msg.getHeader().getAttachment() .entrySet()) { key = param.getKey(); keyArray = key.getBytes("UTF-8"); sendBuf.writeInt(keyArray.length); sendBuf.writeBytes(keyArray); value = param.getValue(); // marshallingEncoder.encode(value, sendBuf); } // for gc key = null; keyArray = null; value = null; if (msg.getBody() != null) { marshallingEncoder.encode(msg.getBody(), sendBuf); } else sendBuf.writeInt(0); // 之前写了crcCode 4bytes,除去crcCode和length 8bytes即为更新之后的字节 sendBuf.setInt(4, sendBuf.readableBytes() - 8); } }
解码
package com.shu.MyByteOutput;
import com.shu.Factory.MarshallingDecoder;
import com.shu.Pojo.Head;
import com.shu.Pojo.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldbasedframeDecoder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class NettyMessageDecoder extends LengthFieldbasedframeDecoder {
MarshallingDecoder marshallingDecoder;
public NettyMessageDecoder(int maxframeLength, int lengthFieldOffset,
int lengthFieldLength) throws IOException {
super(maxframeLength, lengthFieldOffset, lengthFieldLength);
marshallingDecoder = new MarshallingDecoder();
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
NettyMessage message = new NettyMessage();
Head header = new Head();
header.setCrcCode(frame.readInt());
header.setLength(frame.readInt());
header.setSessionID(frame.readLong());
header.setType(frame.readByte());
header.setPriority(frame.readByte());
int size = frame.readInt();
if (size > 0) {
Map attch = new HashMap(size);
int keySize = 0;
byte[] keyArray = null;
String key = null;
for (int i = 0; i < size; i++) {
keySize = frame.readInt();
keyArray = new byte[keySize];
frame.readBytes(keyArray);
key = new String(keyArray, "UTF-8");
attch.put(key, marshallingDecoder.decode(frame));
}
keyArray = null;
key = null;
header.setAttachment(attch);
}
if (frame.readableBytes() > 4) {
message.setBody(marshallingDecoder.decode(frame));
}
message.setHeader(header);
return message;
}
}
3.3.5.6 握手与安全认证
握手的发起是在客户端和服务端TCP链路建立成功通道激活时,握手消息的接入和安全认证在服务端处理。为了保证整个集群环境的安全,内部长连接采用基于IP地址的安全认证机制,服务端对握手请求消息的IP地址进行合法性校验:如果在白名单之内,则校验通过;否则,拒绝对方连接。如果将Netty协议栈放到公网中使用,需要采用更加严格的安全认证机制,例如基于密钥和AES 加密的用户名+密码认证机制,也可以采用SSL/TSL安全传输。
package com.shu.Pojo;
public enum MessageType {
BUZ_REQUEST(0,"业务请求"),
BUZ_RESPONSE(1,"业务相应"),
BUZ_ONEWAY(2,"即是请求也是响应"),
HANDSHAKE_REQUEST(3,"握手请求"),
HANDSHAKE_RESPONSE(4,"握手响应"),
HEARTBEAT_REQUEST(5,"心跳请求"),
HEARTBEAT_RESPONSE(6,"心跳响应"),
;
private Integer type;
private String name;
MessageType(Integer type,String name){
this.name = name;
this.type = type;
}
public Integer getType() {
return type;
}
public String getName(){
return name;
}
}
握手请求处理器
package com.shu.Handler;
import com.shu.Pojo.Head;
import com.shu.Pojo.MessageType;
import com.shu.Pojo.NettyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {
private static final Log LOG = LogFactory.getLog(LoginAuthReqHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buildLoginReq());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
NettyMessage message = (NettyMessage) msg;
// 如果是握手应答消息,需要判断是否认证成功
if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.HANDSHAKE_REQUEST.getType()) {
byte loginResult = (byte) message.getBody();
if (loginResult != (byte) 0) {
// 握手失败,关闭连接
ctx.close();
} else {
LOG.info("Login is ok : " + message);
ctx.fireChannelRead(msg);
}
} else
//调用下一个channel链..
ctx.fireChannelRead(msg);
}
private NettyMessage buildLoginReq() {
NettyMessage message = new NettyMessage();
Head header = new Head();
header.setType(MessageType.HANDSHAKE_REQUEST.getType().bytevalue());
message.setHeader(header);
return message;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}
握手响应处理器
package com.shu.Handler;
import com.shu.Pojo.Head;
import com.shu.Pojo.MessageType;
import com.shu.Pojo.NettyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {
private final static Log LOG = LogFactory.getLog(LoginAuthRespHandler.class);
private final Map nodeCheck = new ConcurrentHashMap();
private final String[] whitekList = {"127.0.0.1", "192.168.1.104"};
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
NettyMessage message = (NettyMessage) msg;
// 如果是握手请求消息,处理,其它消息透传
if (message.getHeader() != null && message.getHeader().getType() == MessageType.HANDSHAKE_REQUEST.getType()) {
// 获取远程IP地址
String nodeIndex = ctx.channel().remoteAddress().toString();
// 响应消息
NettyMessage loginResp = null;
// 重复登陆,拒绝
if (nodeCheck.containsKey(nodeIndex)) {
loginResp = buildResponse((byte) -1);
} else {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = address.getAddress().getHostAddress();
// 判断是否是白名单
boolean isOK = false;
for (String WIP : whitekList) {
if (WIP.equals(ip)) {
isOK = true;
break;
}
}
// 登录响应消息
loginResp = isOK ? buildResponse((byte) 0)
: buildResponse((byte) -1);
if (isOK)
nodeCheck.put(nodeIndex, true);
}
LOG.info("The login response is : " + loginResp
+ " body [" + loginResp.getBody() + "]");
ctx.writeAndFlush(loginResp);
} else {
ctx.fireChannelRead(msg);
}
}
private NettyMessage buildResponse(byte result) {
NettyMessage message = new NettyMessage();
Head header = new Head();
header.setType(MessageType.HANDSHAKE_RESPONSE.getType().bytevalue());
message.setHeader(header);
message.setBody(result);
return message;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
nodeCheck.remove(ctx.channel().remoteAddress().toString());// 删除缓存
ctx.close();
ctx.fireExceptionCaught(cause);
}
}
3.3.5.7 心跳检测
在凌晨等业务低谷期时段,如果发生网络闪断、连接被Hang 住等网络问题时,由于没有业务消息,应用进程很难发现。到了白天业务高峰期时,会发生大量的网络通信失败,严重的会导致一段时间进程内无法处理业务消息。为了解决这个问题,在网络空闲时采用心跳机制来检测链路的互通性,一旦发现网络故障,立即关闭链路,主动重连。(1)当网络处于空闲状态持续时间达到T(连续周期T没有读写消息)时,客户端主动发送 Ping 心跳消息给服务端;(2)如果在下一个周期T到来时客户端没有收到对方发送的Pong心跳应答消息或者读取到服务端发送的其他业务消息,则心跳失败计数器加1;(3)每当客户端接收到服务的业务消息或者Pong应答消息,将心跳失败计数器清零;当连续N次没有接收到服务端的 Pong消息或者业务消息,则关闭链路,间隔INTERVAL时间后发起重连操作;(4)服务端网络空闲状态持续时间达到T后,服务端将心跳失败计数器加1;只要接收到客户端发送的Ping 消息或者其他业务消息,计数器清零;(5)服务端连续N次没有接收到客户端的Ping消息或者其他业务消息,则关闭链路,释放资源,等待客户端重连。通过Ping-Pong双向心跳机制,可以保证无论通信哪一方出现网络故障,都能被及时地检测出来。为了防止由于对方短时间内繁忙没有及时返回应答造成的误判,只有连续N次心跳检测都失败才认定链路已经损害,需要关闭链路并重建链路。当读或者写心跳消息发生IO 异常的时候,说明链路已经中断,此时需要立即关闭链路,如果是客户端,需要重新发起连接。如果是服务端,需要清空缓存的半包信息,等待客户端重连。
心跳请求处理器
package com.shu.Handler;
import com.shu.Pojo.Head;
import com.shu.Pojo.MessageType;
import com.shu.Pojo.NettyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.ScheduledFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.TimeUnit;
public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {
private static final Log LOG = LogFactory.getLog(HeartBeatReqHandler.class);
//使用定时任务发送
private volatile ScheduledFuture> heartBeat;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
NettyMessage message = (NettyMessage) msg;
// 当握手成功后,Login响应向下透传,主动发送心跳消息
if (message.getHeader() != null && message.getHeader().getType() == MessageType.HANDSHAKE_RESPONSE.getType()) {
//NioEventLoop是一个Schedule,因此支持定时器的执行,创建心跳计时器
heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
} else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESPONSE.getType()) {
LOG.info("Client receive server heart beat message : ---> " + message);
} else
ctx.fireChannelRead(msg);
}
//Ping消息任务类
private static class HeartBeatTask implements Runnable {
private final ChannelHandlerContext ctx;
public HeartBeatTask(final ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
NettyMessage heatBeat = buildHeatBeat();
LOG.info("Client send heart beat messsage to server : ---> " + heatBeat);
ctx.writeAndFlush(heatBeat);
}
private NettyMessage buildHeatBeat() {
NettyMessage message = new NettyMessage();
Head header = new Head();
header.setType(MessageType.HEARTBEAT_REQUEST.getType().bytevalue());
message.setHeader(header);
return message;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
}
}
心跳响应处理器
package com.shu.Handler;
import com.shu.Pojo.Head;
import com.shu.Pojo.MessageType;
import com.shu.Pojo.NettyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {
private static final Log LOG = LogFactory.getLog(HeartBeatRespHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
NettyMessage message = (NettyMessage) msg;
// 返回心跳应答消息
if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.HEARTBEAT_REQUEST.getType()) {
LOG.info("Receive client heart beat message : ---> "
+ message);
NettyMessage heartBeat = buildHeatBeat();
LOG.info("Send heart beat response message to client : ---> "
+ heartBeat);
ctx.writeAndFlush(heartBeat);
} else
ctx.fireChannelRead(msg);
}
private NettyMessage buildHeatBeat() {
NettyMessage message = new NettyMessage();
Head header = new Head();
header.setType(MessageType.HEARTBEAT_REQUEST.getType().bytevalue());
message.setHeader(header);
return message;
}
}
心跳超时的机制非常简单,直接利用Netty的ReadTimeoutHandler进行实现,当一定周期内(50s)没有接收到任何对方消息时,需要主动关闭链路。如果是客户端,则重新发起连接,如果是服务端,则释放资源,清除客户端登录缓存信息,等待服务器端重连。 3.3.5.8 服务端,客服端
服务端
package com.shu.Pojo;
public final class NettyConstant {
public static final String REMOTEIP = "127.0.0.1";
public static final int PORT = 8080;
public static final int LOCAL_PORT = 12088;
public static final String LOCALIP = "127.0.0.1";
}
package com.shu.Server;
import com.shu.Handler.HeartBeatRespHandler;
import com.shu.Handler.LoginAuthRespHandler;
import com.shu.MyByteOutput.NettyMessageDecoder;
import com.shu.MyByteOutput.NettyMessageEncoder;
import com.shu.Pojo.NettyConstant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
public class NettyServer {
private static final Log LOG = LogFactory.getLog(NettyServer.class);
public void bind() throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch)
throws IOException {
ch.pipeline().addLast(
new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast(new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler",
new ReadTimeoutHandler(50));
ch.pipeline().addLast(new LoginAuthRespHandler());
ch.pipeline().addLast("HeartBeatHandler",
new HeartBeatRespHandler());
}
});
// 绑定端口,同步等待成功
b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
LOG.info("Netty server start ok : "
+ (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
}
public static void main(String[] args) throws Exception {
new NettyServer().bind();
}
}
客户端
package com.shu.Client;
import com.shu.Handler.HeartBeatReqHandler;
import com.shu.Handler.LoginAuthReqHandler;
import com.shu.MyByteOutput.NettyMessageDecoder;
import com.shu.MyByteOutput.NettyMessageEncoder;
import com.shu.Pojo.NettyConstant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class NettyClient {
private static final Log LOG = LogFactory.getLog(NettyClient.class);
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
EventLoopGroup group = new NioEventLoopGroup();
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast("MessageEncoder",
new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler",
new ReadTimeoutHandler(50));
ch.pipeline().addLast("LoginAuthHandler",
new LoginAuthReqHandler());
ch.pipeline().addLast("HeartBeatHandler",
new HeartBeatReqHandler());
}
});
// 发起异步连接操作
ChannelFuture future = b.connect(
new InetSocketAddress(host, port),
new InetSocketAddress(NettyConstant.LOCALIP,
NettyConstant.LOCAL_PORT)).sync();
// 当对应的channel关闭的时候,就会返回对应的channel。
// Returns the ChannelFuture which will be notified when this channel is closed. This method always returns the same future instance.
future.channel().closeFuture().sync();
} finally {
// 所有资源释放完成之后,清空资源,再次发起重连操作
executor.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
try {
connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 发起重连操作
} catch (Exception e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) throws Exception {
new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
}



