栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

netty如何实现优化百万并发http请求

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

netty如何实现优化百万并发http请求

问题引入:如果项目存在数量大的并发,如何优化项目性能

  • 首先判断项目性能瓶颈是什么, CPU,还是I/O请求
    大部分是网络I/o瓶颈,主要是因为网络请求是阻塞,导致太慢

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
JDK 原生 NIO 程序的问题
JDK 原生也有一套网络应用程序 API,但是存在一系列问题,主要如下:
NIO 的类库和 API 繁杂,使用麻烦。你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
需要具备其他的额外技能做铺垫。例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。
可靠性能力补齐,开发工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。
NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。
JDK NIO 的 Bug。例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。
官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决。
Netty 的特点
Netty 对 JDK 自带的 NIO 的 API 进行封装,解决上述问题,主要特点有:
设计优雅,适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 起)。
使用方便,详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
高性能,吞吐量更高,延迟更低;减少资源消耗;最小化不必要的内存复制。
安全,完整的 SSL/TLS 和 StartTLS 支持。
社区活跃,不断更新,社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。

Netty 常见使用场景

Netty 常见的使用场景如下:
互联网行业。在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。
典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
游戏行业。无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。
大数据领域。经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
有兴趣的读者可以了解一下目前有哪些开源项目使用了 Netty:Related Projects。
Netty 高性能设计
Netty 作为异步事件驱动的网络,高性能之处主要来自于其 I/O 模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据。

netty实现http客户端解决网络I/O瓶颈
@Component
@Order(2)
public class HttpClient {
    public static void start() {
        EventLoopGroup group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer() {

                        @Override
                        protected void initChannel(Channel channel)
                                throws Exception {
                            
                            //请求编码
                            channel.pipeline().addLast(new HttpRequestEncoder());
                            //响应解码
                            channel.pipeline().addLast(new HttpResponseDecoder());
                            channel.pipeline().addLast(new HttpClientHandler());
                        }
                    });
            SimpleClientHttpRequestFactory httpRequestFactory = new SimpleClientHttpRequestFactory();
            SocketAddress address = new InetSocketAddress("net-proxy-pub-test.sftcwl.com", 1080);
            SocketAddress address1 = new InetSocketAddress("gis.sf-express.com", 8080);
            Proxy proxy = new Proxy(Proxy.Type.HTTP, address);
            httpRequestFactory.setProxy(proxy);

            ChannelFuture future = bootstrap.connect("localhost",8162).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }


    public static void main(String[] args) {
        start();
    }
public class HttpClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String CLOSE_FREE_CHANNEL = "closeFreeChannel";
    
    public static final String CLOSE_CHANNEL_CAUSE = "closeChannelCause";
    private static Set channelIDSet = new CopyOnWriteArraySet<>();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
     
         URI uri = new URI("/abc/project/list");
        Map param = new HashMap<>(2);
        param.put("address", "雁塔区");
        param.put("ak", "58008e825a944a45909e056be12f511f");
        String prefix = "?";
        StringBuilder urlBuilder = new StringBuilder();
        for (String key : param.keySet()) {
            urlBuilder.append(prefix).append(key).append("=").append(param.get(key));
            prefix = "&";
        }
        LOGGER.info(urlBuilder.toString());
        //配置HttpRequest的请求数据和一些配置信息
        FullHttpRequest request = new DefaultFullHttpRequest(
                HTTP_1_1,
                HttpMethod.GET,
                uri.toASCIIString()
                );
        LOGGER.info(request.toString());

        request.headers()
                .set(HttpHeaderNames.HOST, "127.0.0.1")
                .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
                //开启长连接
                .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
                //设置传递请求内容的长度
                .set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());

        ctx.writeAndFlush(request);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("msg -> " + msg);
        if (msg instanceof HttpContent) {
            HttpContent content = (HttpContent) msg;
            ByteBuf buf = content.content();
            String result = buf.toString(CharsetUtil.UTF_8);


            System.out.println("response -> " + result);
            LOGGER.info(result);
        }
    }

    
    @Override
    public void channelInactive(final ChannelHandlerContext ctx) {
        LOGGER.info("Channel为 {} 断开,Disconnected Server:{}", ctx.channel().id(), ctx.channel().remoteAddress());
        EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(new Runnable() {
            @Override
            public void run() {
                SocketChannel channel = (SocketChannel) ctx.channel();
                int port = channel.remoteAddress().getPort();
                //获取channel中存储的关闭连接的原因,如果是关闭空闲连接,则不再重连
                String closeChannelCause = channel.attr(AttributeKey.valueOf(CLOSE_CHANNEL_CAUSE)).get();
                if (CLOSE_FREE_CHANNEL.equals(closeChannelCause)) {
                    LOGGER.info("检测到Channel为 {} 的空闲连接,已经断开.", channel.id());
                    return;
                }
                LOGGER.info("检测到与服务端断开连接,NettyClient开始重连{}:{}", channel.remoteAddress().getHostName(), port);
            }
        }, 3, TimeUnit.SECONDS);
        ctx.fireChannelInactive();

    }

    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        Channel channel = ctx.channel();
        if (evt instanceof IdleStateEvent) {
            if (channel.isActive()) {
                LOGGER.info("[客户端心跳监测] 通道编号: {} ,客户端信息: {}", channel.id(), channel.localAddress());
                channelIDSet.add(channel.id().toString());
                LOGGER.info("[客户端心跳监测] 关闭当前空闲,通道编号: {} ,客户端信息: {}", channel.id(), channel.localAddress());
                return;
            }
        }
    }

    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("Socket通信过程中出现异常,异常原因为:", cause);
        super.exceptionCaught(ctx, cause);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/306267.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号