栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

『玩转 Netty』Netty 模型

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

『玩转 Netty』Netty 模型

文章目录
  • 三、Netty
    • 3.1 原生 NIO 存在的问题
    • 3.2 线程模型
    • 3.3 Netty 模型
    • 3.4 入门实例
    • 3.5 taskQueue 自定义任务
    • 3.6 异步模型原理
    • 3.7 HTTP 服务案例
    • 3.8 Netty 核心模块
    • 3.9 Pipline 组件
    • 3.10 进阶实例
    • 3.11 心跳检测
    • 3.12 WebSocket 长连接

三、Netty 3.1 原生 NIO 存在的问题

1、NIO 库和 API 复杂,使用麻烦。

2、需要具备 Java 多线程,网络编程能力。

3、开发工总量大,难度大;断线重连,网络拥塞控制等。

4、BIO 的 bug,Epoll Bug 会导致 Selector 空轮询,最终导致 CPU 100%。

3.2 线程模型

1、传统阻塞 I/O 模型

特点:

  • 阻塞 IO 获取输入数据。

  • 每个连接需要独立线程。

缺点:

  • 并发数大,创建大量线程,浪费资源。

  • 线程无数据可读时,会阻塞在 read,浪费资源。

2、Reactor 模型

  • 基于 IO 复用,多个连接公用一个阻塞对象。
  • 基于线程池,一个线程处理多个连接。
  • 负责监听和分发事件,中介者。

3、单 Reactor 单线程。

上文的NIO 聊天室,就是采用模式的。

优点:

  • 简单,所有都在一个线程中完成。

缺点:

  • 无法发挥多核 CPU 的性能。
  • 线程意外终止或死循环,会导致整个系统不可用,节点故障。

使用场景:客户端少,业务处理块。

4、单 Reactor 多线程。

优点:

  • 充分利用多核 CPU。

缺点:

  • 多线程数据共享,访问复杂。
  • reactor 处理所有事件的监听和响应,在单线程中运行,高并发时出现性能瓶颈。

5、主从 Reactor 多线程。[Netty 基于它]

优点:

  • Reactior 父子线程任务明确,父线程只接收新连接,子线程负责业务处理。
  • 父子线程交互简单,父线程只需把连接给子线程,子线程无需返回数据。

缺点:

  • 实现复杂。
3.3 Netty 模型

简单版:

详解版:

1、Netty 有两组线程池:BossGroup[只接收新连接],WorkGroup[业务逻辑]。

2、BossGroup 和 WorkGroup 都是 NioEventLoopGroup 类型。

3、每个 NioEventLoop 都有 selector,用于绑定 socket 网络通讯。

3.4 入门实例

1、客户端给服务器发送信息,服务器回复客户端 “你好,客户端”。

package com.sugar.netty.simple;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;


public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        // 创建 bossgroup 和 workergroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 服务器启动对象 配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup,workerGroup)  // 设置两个线程组
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel
                    .option(ChannelOption.SO_BACKLOG,128)   // 设置线程队列 连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)  // 设置保持活动连接状态
                    .childHandler(new ChannelInitializer() {
                        // 给管道设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("服务器准备完成!");

            // 绑定端口 - 同步处理
            // 启动服务器
            ChannelFuture cf = bootstrap.bind(6666).sync();

            // 关闭通道监听
            cf.channel().closeFuture().sync();
        }finally {
            // 出现异常直接关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.sugar.netty.simple;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;



public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }

    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 回复客户端消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,客户端",StandardCharsets.UTF_8));
    }

    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("ctx = " + ctx);

        // 接受客户端消息
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送:" + buf.toString(StandardCharsets.UTF_8));
        System.out.println("客户端地址:" + ctx.channel().remoteAddress());
    }


}

一些关系的图示:

3.5 taskQueue 自定义任务

1、用户自定义普通任务。

// 解决方案1:用户自定义普通任务
ctx.channel().eventLoop().execute(new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(10 * 1000);
        }catch (Exception e) {
            System.out.println("自定义普通任务 执行异常");
        }
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,客户端2",StandardCharsets.UTF_8));
    }
});

2、用户自定义定时任务。

// 解决方案2:用户自定义定时任务
ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,客户端2",StandardCharsets.UTF_8));
    }
},5, TimeUnit.SECONDS);

3、非当前 Reactor 调用 Channel 方法。

3.6 异步模型原理

介绍

1、调用者不能立刻得到结果,调用完成后通过回调通知调用者。

2、Netty 的 IO 操作是异步的。

Future

1、异步执行的结果,通过它的方法检测执行是否完成。

2、ChannelFulture 是一个接口,可以添加监听器,监听事件发生时通知监听器。

Future Listener 机制

1、Future 创建时处于非完成状态,可以用过 ChannelFulture 查询执行状态、注册监听器等。

2、案例:实现异步绑定端口,绑定完毕后调用相应监听器输出是否成功。

// 给 ChannelFuture 注册监听器,监控是否注册成功,并输出
cf.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("绑定端口6666成功");
        }else {
            System.out.println("绑定失败:" + channelFuture.isCancellable());
        }
    }
});

3、相比传统 IO,线程会被阻塞直到操作完成(这里指绑定端口);异步则不会阻塞,可以去做别的事,当绑定成功,监听器会通知你。

3.7 HTTP 服务案例

Netty 可以做 HTTP 服务开发

每个客户端独享一个 Pipline,每个 PipLine 存放多个 Handler。

案例:监听 6677 端口,服务器给客户端回复 “Hello Netty”,并过滤特定请求资源。

服务端:

package com.sugar.netty.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;


public class HttpServer {
    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new HttpServerInitializer());

            ChannelFuture cf = serverBootstrap.bind(6677).sync();
            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务器初始化器:

package com.sugar.netty.http;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;


public class HttpServerInitializer extends ChannelInitializer {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 获取管道
        ChannelPipeline pipeline = socketChannel.pipeline();

        // 加入解码器
        pipeline.addLast("我的解码器",new HttpServerCodec());

        // 加入自定义处理器
        pipeline.addLast("我的处理器",new HttpServerHandler());
    }
}

处理器:

package com.sugar.netty.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

import java.net.URI;


public class HttpServerHandler extends SimpleChannelInboundHandler {
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject) throws Exception {
        // 判断 msg 是不是一个 request 请求
        if (httpObject instanceof HttpRequest) {
            System.out.println("管道哈希值:" + ctx.pipeline().hashCode() + " 处理器哈希值:" + this.hashCode());
            System.out.println("客户端地址:" + ctx.channel().remoteAddress());

            // 请求过滤
            HttpRequest request = (HttpRequest) httpObject;
            URI uri = new URI(request.uri());
            if ("/favicon.ico".equals(uri.getPath())) {
                System.out.println("此请求被过滤");
                return;
            }

            // 回复浏览器 [回复客户端 - http协议]
            ByteBuf buf = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8);

            // 构造 Http 协议 httpResponse
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,buf);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH,buf.readableBytes());

            // 发送
            ctx.writeAndFlush(response);
        }
    }
}
3.8 Netty 核心模块

1、Bootstrap,ServerBootstrap

  • Bootstrap配置整个 Netty 程序 ,串联组件,是客户端引导类,ServerBootstrap 是服务端引导类。

2、Future,ChannelFuture

  • Netty 所有操作都是异步,无需立刻获得结果,而由监听器通知调用者和触发监听事件。

3、Channel

  • 网络通信组件,执行异步 IO 操作。

4、Selector

  • 基于 Selector 实现多路复用,一个 Selector 可以监听多个 Channel。
  • Channel 向 Selector 注册,不断查询 Channel 中是否有就绪事件,

5、ChannelHandler

  • 处理 IO 事件,并转发到 Pipline 下一个 Handler 中。
3.9 Pipline 组件

1、Pipline 相当与一个链表,储存了许多 Handler,类似一种链式过滤器模式。

2、Channel 包含的 ChannelPipline 中维护了一个 ChannelHandlerContext 组成的双向链表,每个 ChannelHanderContext 关联着一个 ChannelHandler。

3.10 进阶实例

Netty 群聊系统

要求:实现多人群聊系统。服务端监控用户上下线,消息转发;客户端发送消息给其他用户。

package com.sugar.netty.XChatGroup;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;


public class GroupClient {
    private final String host;
    private final int port;

    public GroupClient (String host,int port) {
        this.host = host;
        this.port = port;
    }

    public void run () throws InterruptedException {
        // 创建线程组
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 添加处理器
                            ChannelPipeline pipeline = socketChannel.pipeline();

                            pipeline.addLast("我的解码器",new StringDecoder());
                            pipeline.addLast("我的编码器",new StringEncoder());
                            pipeline.addLast("我的业务处理器",new GroupClientHandler());
                        }
                    });
            ChannelFuture cf = bootstrap.connect(host, port).sync();
            cf.channel().closeFuture();

            //----------------------------------------------------------------------------------
            Channel channel = cf.channel();
            System.out.println("[客户端 - " + channel.localAddress() + "] 启动完毕");

            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                // 发送数据
                channel.writeAndFlush(msg + "n") ;
            }
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        GroupClient groupClient = new GroupClient("127.0.0.1", 7000);
        groupClient.run();
    }
}
package com.sugar.netty.XChatGroup;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;


public class GroupClientHandler extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
    }
}
package com.sugar.netty.XChatGroup;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;



public class GroupServer {
    private final int port;

    public GroupServer (int port) {
        this.port = port;
    }

    // 处理客户端请求
    public void run () throws InterruptedException {
        // 创建线程组
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup(8);

        try {
            // 启动器
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss,work)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 添加处理器
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("我的解码器",new StringDecoder());
                            pipeline.addLast("我的编码器",new StringEncoder());
                            pipeline.addLast("我的业务处理器",new GroupServerHandler());
                        }
                    });

            System.out.println("Netty 启动完成");
            ChannelFuture cf = bootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        GroupServer groupServer = new GroupServer(7000);
        groupServer.run();
    }
}
package com.sugar.netty.XChatGroup;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;


public class GroupServerHandler extends SimpleChannelInboundHandler {
    // 定义所有 Channel 组,方便管理
    private static final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 推送给其他客户端,当前用户上线了
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端 - " + channel.remoteAddress() + "] 加入聊天!n");
        System.out.println("当前在线用户:" + channelGroup.size());

        // 加入频道组
        channelGroup.add(channel);
    }

    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端 - " + channel.remoteAddress() + "] 离开聊天!n");
        System.out.println("当前在线用户:" + channelGroup.size());
    }

    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("[客户端 - " + ctx.channel().remoteAddress() + "] 上线!n");
    }

    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("[客户端 - " + ctx.channel().remoteAddress() + "] 离线!n");
    }

    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        // 发送者频道
        Channel channel = ctx.channel();

        // 转发,除去自己
        channelGroup.forEach(ch -> {
            if (channel != ch) {
                ch.writeAndFlush("[客户端 - " + channel.remoteAddress() + "] 说:" + s + "n");
            }
        });
    }

    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
3.11 心跳检测

实时感知客户端是否存在

心跳检测器会会将事件转发给管道下一个处理器。

// 心跳检测器
pipeline.addLast("心跳检测器",new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
// 心跳处理器
pipeline.addLast("心跳处理器",new ServerHandler());
package com.sugar.netty.HeartBeat;

import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;


public class ServerHandler extends SimpleChannelInboundHandler {

    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            switch (event.state()) {
                case READER_IDLE:
                    eventType = "读空闲";
                    break;
                case WRITER_IDLE:
                    eventType = "写空闲";
                    break;
                case ALL_IDLE:
                    eventType = "读写空闲";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "超时时间:" + eventType);
            System.out.println("服务器进行相关处理.");
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {

    }
}
3.12 WebSocket 长连接

Http 协议是无状态的,请求响应一次,下次仍会建立新连接。

案例:

  • 实现基于 WebSocket 的长连接全双工交互。
  • 服务器,客户端可以互发消息;并且可以相互感知,谁关闭。
package com.sugar.netty.WebSocket;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;



public class Server {
    public static void main(String[] args) throws InterruptedException {
        // 创建线程组
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss,work)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))  // boss 添加日志处理器
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();

                            // 基于 http 协议,加入 http 解码器编码器
                            pipeline.addLast(new HttpServerCodec());

                            // 以块方式写,加入 块处理器
                            pipeline.addLast(new ChunkedWriteHandler());

                            // http 数据传输中是分段的,加入 段聚合处理器
                            pipeline.addLast(new HttpObjectAggregator(8192));

                            // websocket 数据以帧传输
                            // 浏览器请求URL: ws://127.0.0.1:7000/hello
                            // 核心功能将 http协议 ------> ws协议
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));

                            // 我的业务处理器
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            ChannelFuture cf = bootstrap.bind(7000).sync();
            cf.channel().closeFuture().sync();
            System.out.println("服务器启动完毕");
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}
package com.sugar.netty.WebSocket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketframe;

import java.time.LocalDateTime;


public class ServerHandler extends SimpleChannelInboundHandler {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 唯一 id
        System.out.println("[handlerRemoved] " + ctx.channel().id().asLongText());
        System.out.println("[handlerRemoved] " + ctx.channel().id().asShortText());
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 唯一 id
        System.out.println("[handlerAdded] " + ctx.channel().id().asLongText());
        System.out.println("[handlerAdded] " + ctx.channel().id().asShortText());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketframe textWebSocketframe) throws Exception {
        // 接受消息
        System.out.println("收到消息:" + textWebSocketframe.text());

        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketframe("[服务器] - " + LocalDateTime.now()
        + " " + textWebSocketframe.text()));
    }
}



    
    
    
    WebSocket长连接 Demo


    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/692984.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号