栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Netty编解码&粘包拆包&心跳机制&断线重连

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Netty编解码&粘包拆包&心跳机制&断线重连

文章目录
  • 一、Netty编解码
    • 1.1 ChannelHandler
    • 1.2 ChannelPipeline
    • 1.3 编码解码器
  • 二、Netty粘包拆包
    • 2.1 特殊分隔符编解码器示例
    • 2.2 自定义长度分包编解码器示例(自定义编解码器)
  • 三、Netty心跳检测机制
  • 四、Netty断线自动重连实现

一、Netty编解码

Netty涉及到编解码的组件有Channel、ChannelHandler、ChannelPipe等,先大概了解下这几个组件的作用。

1.1 ChannelHandler

ChannelHandler充当了处理入站和出站数据的应用程序逻辑容器。

例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据随后会被你的应用程序的业务逻辑处理。当你要给连接的客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。我们的业务逻辑通常写在一个或者多个ChannelInboundHandler中。

ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。

1.2 ChannelPipeline

ChannelPipeline提供了ChannelHandler链的容器。

以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler(ChannelOutboundHandler调用是从tail到head方向逐个调用每个handler的逻辑),并被这些Handler处理,反之则称为入站的。

入站只调用pipeline里的ChannelInboundHandler逻辑(ChannelInboundHandler调用是从head到tail方向逐个调用每个handler的逻辑)。

1.3 编码解码器

当你通过Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节(注意,这里是以客户端为参考的)。

Netty提供了一系列实用的编码解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由已知解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。

注意,业务的handler一定要放在编解码handler之后!!!因为这些handler都是按照出站或者入栈的顺序执行的! 在服务端角度来看,出站就是从服务端发送给客户端,即从tail到header,所以一定要先执行业务的handler然后再经过编码器转化成二进制数据才能发送到客户端去。二入站就是接受客户端的二进制数据,是从header到tail,必须先经过解码器然后再进行业务的handler。

Netty提供了很多编解码器,比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder和ObjectDecoder等。

如果要实现高效的编解码可以用protobuf,但是protobuf需要维护大量的proto文件比较麻烦,现在一般可以使用protostuff。

protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用我们写.proto文件来实现序列化。使用它也非常简单,代码如下:

引入依赖:


    com.dyuproject.protostuff
    protostuff-api
    1.0.10


    com.dyuproject.protostuff
    protostuff-core
    1.0.10


    com.dyuproject.protostuff
    protostuff-runtime
    1.0.10

protostuff使用示例:

package com.jihu.netty.protostuff_demo;

import com.dyuproject.protostuff.linkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class ProtostuffUtil {

    private static Map, Schema> cachedSchema = new ConcurrentHashMap, Schema>();

    private static  Schema getSchema(Class clazz) {
        @SuppressWarnings("unchecked")
        Schema schema = (Schema) cachedSchema.get(clazz);
        if (schema == null) {
            schema = RuntimeSchema.getSchema(clazz);
            if (schema != null) {
                cachedSchema.put(clazz, schema);
            }
        }
        return schema;
    }

    
    public static  byte[] serializer(T obj) {
        @SuppressWarnings("unchecked")
        Class clazz = (Class) obj.getClass();
        linkedBuffer buffer = linkedBuffer.allocate(linkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema schema = getSchema(clazz);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    
    public static  T deserializer(byte[] data, Class clazz) {
        try {
        	// 对象必须有无参构造
            T obj = clazz.newInstance();
            Schema schema = getSchema(clazz);
            ProtostuffIOUtil.mergeFrom(data, obj, schema);
            return obj;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    public static void main(String[] args) {
        byte[] userBytes = ProtostuffUtil.serializer(new User(1, "xiaoyan"));
        User user = ProtostuffUtil.deserializer(userBytes, User.class);
        System.out.println(user);
    }
}

public class User {
    Integer id;
    String name;
	
	// 无参构造后面的ProtostuffUtil序列化和反序列化方法反射实例化的时候需要 
    public User() {
    }

    public User(Integer id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + ''' +
                '}';
    }
} 

那我们如何在Netty中使用呢?

这个工具类不用改变,copy使用即可。在我们业务的handler中在需要发送或者接收数据的地方调用这个工具类的serializer方法和deserializer方法即可。

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buffer = Unpooled.copiedBuffer(ProtostuffUtil.serializer(new User(1, "xiaoyan")));
        ctx.writeAndFlush(buffer);
    }
...    
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程: " + Thread.currentThread().getName());

        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline();

        // 将msg转成一个 ByteBuf, 类似NIO的ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        System.out.println("接收到客户端消息: " + ProtostuffUtil.deserializer(bytes, User.class));
    }
...    

虽然这样也可以使用,但是显然不是最好的。我们能不能像使用ObjectEncoder和ObjectDecoder一样,直接添加到channelpipeline中呢?这样我们只来处理对象就好了,不用再关注编解码了!

我们先来看粘包和拆包。

二、Netty粘包拆包

TCP是一个流协议,就是没有界限的一长串二进制数据。

TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。

如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。

我们用实际代码演示一下这个问题。我们改造一下上次的聊天室程序。

我们启动服务端一个一个客户端之后,然后修改一下客户端的发送代码:

我们将ChatClient中键盘输入的消息修改成一次性发送200条数据:

            // 客户端需要创建一个扫描器来接收输入的消息
//            Scanner scanner = new Scanner(System.in);
//            while (scanner.hasNext()) {
//                String msg = scanner.nextLine();
//                // 通过channel消息发送到服务器
//                channel.writeAndFlush(msg);
//            }
            for (int i = 0; i < 200; i++) {
                channel.writeAndFlush("Hello xiaoyan");
            }

然后我们再启动一个新的客户端让其发送数据,然后观察第一个客户端收到的消息:

可以看到,这里发生了粘包问题。并不是像我们所想的一样,一条消息通知一次,而是将多条消息一起进行了发送。

解决方案:
1、消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格
2、在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不能出现分隔符(服务端需要解析并根据这些分隔符分割消息)
3、【推荐使用】发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束

Netty提供了多个解码器,可以进行分包的操作,如下:

  • LinebasedframeDecoder (回车换行分包)
  • DelimiterbasedframeDecoder(特殊分隔符分包)
  • FixedLengthframeDecoder(固定长度报文来分包)
2.1 特殊分隔符编解码器示例

服务端:

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 设置两个线程组
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childHandler(new ChannelInitializer() { // 创建通道初始化对象, 设置初始化参数
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline()
                		// 假如特殊分隔符编码器: param1: 可发送数据大小; param2: 分隔符
                        .addLast(new DelimiterbasedframeDecoder(1024, Unpooled.copiedBuffer("_".getBytes())))
                        // 向pipeline添加解码器
                        .addLast("decoder", new StringDecoder())
                        // 向pipeline添加解码器
                        .addLast("encoder", new StringEncoder())
                        .addLast(new ChatServerHandler());
            }
        });
System.out.println("聊天室创建成功...");

客户端(注意这个解码器写需要配置到客户端,因为其实现在的消息是由服务端转发到不同的客户端的):

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group) // 设置线程组
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer() {

            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline()
                        // 假如特殊分隔符编码器: param1: 可发送数据大小; param2: 分隔符
                        .addLast(new DelimiterbasedframeDecoder(1024, Unpooled.copiedBuffer("_".getBytes())))
                        // 向pipeline添加解码器
                        .addLast("decoder", new StringDecoder())
                        // 向pipeline添加解码器
                        .addLast("encoder", new StringEncoder())
                        .addLast(new ChatClientHandler());
            }
        });

ChannelFuture future = bootstrap.connect("127.0.0.1", 9000).sync();

------------------------------
for (int i = 0; i < 200; i++) {
    channel.writeAndFlush("Hello, xiaoyan!" + "_");
}


注意这这种使用特殊分隔符的弊端,就是如果发送的内容中也包含了分隔符,该条消息也会被分割!

2.2 自定义长度分包编解码器示例(自定义编解码器)

MyClient :

public class MyClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new MyMessageEncoder());
                            pipeline.addLast(new MyClientHandler());
                        }
                    });
            System.out.println("Netty client start...");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }

    }
}

MyClientHandler:

public class MyClientHandler extends SimpleChannelInboundHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            String msg = "你好, 我是张三!";
            byte[] msgBytes = msg.getBytes(CharsetUtil.UTF_8);
            // 创建协议包对象
            MyMessageProtocol protocol = new MyMessageProtocol();
            protocol.setLen(msgBytes.length);
            // 我们只能发送发送二进制数据, 想发送对象的话需要自己实现编解码器
            protocol.setContent(msgBytes);
            // 发送到服务端
            ctx.writeAndFlush(protocol);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyMessageProtocol myMessageProtocol) throws Exception {
    }
}

MyServer :

public class MyServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast(new MyMessageDecoder());
                    pipeline.addLast(new MyServerHandler());
                }
            });
            System.out.println("Netty server start...");
            ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

MyServerHandler :

public class MyServerHandler extends SimpleChannelInboundHandler {

    int num = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyMessageProtocol myMessageProtocol) throws Exception {
        System.out.println("===服务端接收到的消息如下===");

        System.out.println("长度=" + myMessageProtocol.getLen());
        System.out.println("内容=" + new String(myMessageProtocol.getContent(), CharsetUtil.UTF_8));
        System.out.println("服务端接收到的消息包数量=" + (++num) + "n");

    }
}

MyMessageDecoder:

public class MyMessageDecoder extends ByteToMessageDecoder {

    int len = 0;

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
        System.out.println("MyMessageDecoder decode 方法被调用了...");

        // 需要将得到的二进制字节码 -> MyMessageProtocol 数据包对象
        System.out.println(byteBuf);

        // 判断可读数据是否大于等于4
        if (byteBuf.readableBytes() >= 4) {
            if (len == 0) {
                // 读取并设置本次内容长度(此时消息格式是: len+content)
                len = byteBuf.readInt();
            }

            // 这里需要判断内容是否也发送过来了, 因为可能发生拆包导致只发送了数据长度
            if (byteBuf.readableBytes() < len) {
                System.out.println("当前可读数据不够, 继续等待...");
                return;
            }

            byte[] content = new byte[len];
            if (byteBuf.readableBytes() >= len) {
                byteBuf.readBytes(content);

                // 封装成MyMessageProtocol对象, 传递到下一个handler业务处理
                MyMessageProtocol protocol = new MyMessageProtocol();
                protocol.setLen(len);
                protocol.setContent(content);
                list.add(protocol);
            }

            len = 0;
        }
    }
}
 

MyMessageEncoder :

public class MyMessageEncoder extends MessageToByteEncoder {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MyMessageProtocol myMessageProtocol, ByteBuf byteBuf) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被调用了");

        byteBuf.writeInt(myMessageProtocol.getLen());
        byteBuf.writeBytes(myMessageProtocol.getContent());
    }
}

MyMessageProtocol :

public class MyMessageProtocol {
    // 定义一次发送包体长度
    private int len;
    // 一次发送包体内容
    private byte[] content;

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

【注意】拆包粘包如果不进行处理会随机发生,也有可能不发生。

三、Netty心跳检测机制

所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性。

在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 看下它的构造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

这里解释下三个参数的含义:

  • readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件
  • writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件
  • allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件

注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:

IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码:

pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));

初步地看下IdleStateHandler源码,先看下IdleStateHandler中的channelRead方法:

红框代码其实表示该方法只是进行了透传,不做任何业务逻辑处理,让channelPipe中的下一个handler处理channelRead方法。

我们再看看channelActive方法:

这里有个initialize的方法,这是IdleStateHandler的精髓,接着探究:

这边会触发一个Task,ReaderIdleTimeoutTask,这个task里的run方法源码是这样的:

第一个红框代码是用当前时间减去最后一次channelRead方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了,那么第二个红框代码则会触发下一个handler的userEventTriggered方法:

如果没有超时则不触发userEventTriggered方法。

【注意】

我们发现这里的定时任务并不是使用的Timer之类的每隔几秒就执行一次的定时任务器,而是重复去调用schedule方法。

并且当超时的时候,delay时间设置的是我们自定义的超时时间,也就是3秒。但是如果没有超时的话,这个delay时间并不是我们自定义的3秒。而是通过下面计算得来:
nextDelay -= ticksInNanos() - lastReadTime;


我们在channelReadComplete方法中可以看到,每次读取完毕后,都会将现在的时间赋值给lastReadTime。
而nextDelay的时间是:自定义超时时间 - (当前时间 - 上一次读时间)
如果结果是小于等于0表示超时了,需要重新赋值delay时间为自定义超时时间。
如果结果大于0表示未超时,则nextDelay时间就等于计算后的。而不是固定的自定义超时时间,因为此时距离上次读操作之间已经过了一段时间,就是 (当前时间 - 上一次读时间) 这段时间,而我们设置下一次delay时间应当是自定义事件减去已经过了的这段时间才是准确的!!!

举例来讲就是上次读操作已经是2秒之前了,那么我设置下一次delay时间应当是1s,而不是自定义的3s。

Netty心跳检测代码示例:

//服务端代码
public class HeartBeatServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            //IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,
                            //会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须
                            //实现userEventTriggered方法处理对应事件
                            pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
                            pipeline.addLast(new HeartBeatHandler());
                        }
                    });
            System.out.println("netty server start。。");
            ChannelFuture future = bootstrap.bind(9000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}
//服务端处理handler
public class HeartBeatServerHandler extends SimpleChannelInboundHandler {

    int readIdleTimes = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(" ====== > [server] message received : " + s);
        if ("Heartbeat Packet".equals(s)) {
            ctx.channel().writeAndFlush("ok");
        } else {
            System.out.println(" 其他信息处理 ... ");
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;

        String eventType = null;
        switch (event.state()) {
            case READER_IDLE:
                eventType = "读空闲";
                readIdleTimes++; // 读空闲的计数加1
                break;
            case WRITER_IDLE:
                eventType = "写空闲";
                // 不处理
                break;
            case ALL_IDLE:
                eventType = "读写空闲";
                // 不处理
                break;
        }
        System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
        // 如果超时超过3次就关闭channel
        if (readIdleTimes > 3) {
            System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
            ctx.channel().writeAndFlush("idle close");
            ctx.channel().close();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
    }
}
//客户端代码
public class HeartBeatClient {
 public static void main(String[] args) throws Exception {
     EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
     try {
         Bootstrap bootstrap = new Bootstrap();
         bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                 .handler(new ChannelInitializer() {
                     @Override
                     protected void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline pipeline = ch.pipeline();
                         pipeline.addLast("decoder", new StringDecoder());
                         pipeline.addLast("encoder", new StringEncoder());
                         pipeline.addLast(new HeartBeatClientHandler());
                     }
                 });

         System.out.println("netty client start。。");
         Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
         String text = "Heartbeat Packet";
         Random random = new Random();
         while (channel.isActive()) {
             int num = random.nextInt(10);
             // 0~7的随机数,有概率超过3s超时
             Thread.sleep(num * 1000);
             channel.writeAndFlush(text);
         }
     } catch (Exception e) {
         e.printStackTrace();
     } finally {
         eventLoopGroup.shutdownGracefully();
     }
 }

 static class HeartBeatClientHandler extends SimpleChannelInboundHandler {

     @Override
     protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
         System.out.println(" client received :" + msg);
         if (msg != null && msg.equals("idle close")) {
             System.out.println(" 服务端关闭连接,客户端也关闭");
             ctx.channel().closeFuture();
         }
     }
 }
}


总结来说,就是我们客户端向服务端每隔固定的时间发送一次数据。

而服务端需要添加IdleStateHandler,当超过设置的超时时间,就会调用userEventTriggered方法中的逻辑。我们在里面维护了一个读超时的次数,规定超过3次就关闭连接。

【注意】是否超时有Netty来实现,超时之后会调用我们自己业务handeler中的userEventTriggered方法。

四、Netty断线自动重连实现

1、客户端启动连接服务端时,如果网络或服务端有问题,客户端连接失败,可以重连,重连的逻辑加在客户端。

public class NettyClient {
    private final String host;
    private final int port;
    private Bootstrap bootstrap;
    private EventLoopGroup group;

    public static void main(String[] args) throws InterruptedException {
        NettyClient client = new NettyClient("localhost", 9000);
        client.connect();
    }


    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
        init();
    }

    private void init() {
        group = new NioEventLoopGroup();

        bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new NettyClientHandler(NettyClient.this));
                    }
                });

    }

    public void connect() {
        System.out.println("Netty client start...");
        ChannelFuture channelFuture = bootstrap.connect(host, port);
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    // 重连交给后端线程执行
                    future.channel().eventLoop().schedule(() -> {
                        System.out.println("重连服务端...");
                        try {
                            connect();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }, 3000, TimeUnit.MICROSECONDS);
                } else {
                    System.out.println("服务端连接成功...");
                }
            }
        });

    }

}

2、系统运行过程中网络故障或服务端故障,导致客户端与服务端断开连接了也需要重连,可以在客户端处理数据的Handler的channelInactive方法中进行重连。

public class NettyClientHandler extends SimpleChannelInboundHandler {
    private NettyClient nettyClient;

    public NettyClientHandler(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("运行中断开重连...");
        nettyClient.connect();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyMessageProtocol myMessageProtocol) throws Exception {
    }
}
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号