栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

netty 学习

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

netty 学习

普通阻塞式 socket (BIO)
  • client

通过 java.net.Socket 来获取流

import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class SocketClient {

    public static void main(String[] args) throws IOException {
        Socket sock = new Socket("localhost", 6666); // 连接指定服务器和端口
        try (InputStream input = sock.getInputStream();
                OutputStream output = sock.getOutputStream()) {
            handle(input, output);
        }
        sock.close();
        System.out.println("disconnected.");
    }

    private static void handle(InputStream input, OutputStream output) throws IOException {
        BufferedWriter writer =
                new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8));
        BufferedReader reader =
                new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
        Scanner scanner = new Scanner(System.in);
        System.out.println("[server] " + reader.readLine());
        for (; ; ) {
            System.out.print(">>> "); // 打印提示
            String s = scanner.nextLine(); // 读取一行输入
            writer.write(s);
            writer.newLine();
            writer.flush();
            String resp = reader.readLine();
            System.out.println("<<< " + resp);
            if (resp.equals("bye")) {
                break;
            }
        }
    }
}
  • server
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

@SuppressWarnings("all")
public class SocketServer {

    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(6666); // 监听指定端口
        System.out.println("server is running...");
        for (; ; ) {
            Socket sock = ss.accept();
            System.out.println("connected from " + sock.getRemoteSocketAddress());
            Thread t = new Handler(sock);
            t.start();
        }
    }

    static class Handler extends Thread {
        Socket sock;

        public Handler(Socket sock) {
            this.sock = sock;
        }

        @Override
        public void run() {
            try (InputStream input = this.sock.getInputStream()) {
                try (OutputStream output = this.sock.getOutputStream()) {
                    handle(input, output);
                }
            } catch (Exception e) {
                try {
                    this.sock.close();
                } catch (IOException ignored) {
                }
            }
            System.out.println("client disconnected.");
        }

        private void handle(InputStream input, OutputStream output) throws IOException {
            BufferedWriter writer =
                    new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8));
            BufferedReader reader =
                    new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
            writer.write("hellon");
            writer.flush();
            for (; ; ) {
                String s = reader.readLine();
                if (s.equals("bye")) {
                    writer.write("byen");
                    writer.flush();
                    break;
                }
                writer.write("ok: " + s + "n");
                writer.flush();
            }
        }
    }
}
Buffer
@Slf4j
public class TestFileChannelTransferTo {

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        try (final FileChannel from = new FileInputStream("from.txt").getChannel();
                final FileChannel to = new FileOutputStream("to.txt").getChannel()) {
            // 剩余 未传输的 字节数
            long size = from.size();
            // Question: 上限:最大一次传输 2G 数据,超出不会被传输

            // from.transferTo(0, from.size(), to); // before: FileChannel#transferTo

            for (long remainder = size; remainder > 0; ) { // after: use for when bigger 2 GB
                long position = size - remainder; // 全部大小 - 剩余字节树
                System.out.println("position:  " + position + " remainder:" + remainder);
                // 比 输入输出流 效率高,底层使用操作系统的 零拷贝 进行优化
                remainder -= from.transferTo(position, remainder, to); // transferTo 返回实际传输的字节数
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        log.info("---time cost: " + (System.currentTimeMillis() - start) + " 毫秒---");
    }

		@Test
    public void traverse() throws IOException {
        // 这里不能使用 局部变量 int, 因为 内部类使用局部变量 必须都是 final 的,java1.8后可以不写final
        final AtomicInteger dirCount = new AtomicInteger();
        final AtomicInteger fileCount = new AtomicInteger();
        // SimpleFileVisitor uses the visitor pattern.
        Files.walkFileTree(
                Paths.get("E:\aio1"),
                new SimpleFileVisitor() {
                    @Override
                    public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
                            throws IOException {
                        System.out.println(">>> 进入 目录 " + dir);
                        dirCount.incrementAndGet();
                        return super.preVisitDirectory(dir, attrs);
                    }
                    @Override
                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
                            throws IOException {
                        // if (file.toString().endsWith(".jar")) // 加判断
                        System.out.println(file);
                        // Files.delete(file); // =================== 删除文件
                        fileCount.incrementAndGet();
                        return super.visitFile(file, attrs);
                    }
                    @Override
                    public FileVisitResult postVisitDirectory(Path dir, IOException exc)
                            throws IOException {
                        // Files.delete(dir); // =================== 删除文件夹
                        System.out.println("<<< 退出 目录 " + dir);
                        return super.postVisitDirectory(dir, exc);
                    }
                });
        System.out.println("文件夹:" + dirCount);
        System.out.println("文件数:" + fileCount);
    }
  
    @Test
    public void testCopyDirAll() throws IOException {
        String source = "E:\aio1";
        String target = "E:\aio2";
        // 同样也是遍历
        try (Stream traverseStream = Files.walk(Paths.get(source))) {
            traverseStream.forEach(
                    path -> {
                        String targetName = path.toString().replace(source, target);
                        try {
                            Path currentPath = Paths.get(targetName);
                            if (Files.isDirectory(path)) { // 如果是 目录 创建
                                Files.createDirectories(currentPath);

                            } else if (Files.isRegularFile(path)) { // 如果是 文件 复制
                                Files.copy(path, currentPath);
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    });
        }
    }
}
阻塞vs非阻塞
  1. 阻塞:

    1. 单线程下, 阻塞方法相互影响, 不能正常工作
    2. 多线程下, 线程数多了, cpu 占用高, 上下文切换成本高; 线程池虽然减少了线程数但只能用短连接. 都是治标不治本;
  2. 非阻塞:

    1. 在 ServerSocketChannel.accept 没有连接建立时返回 null , 继续运行
    2. SocketChannel.read 在没有数据可读时, 会返回 0, 会一直循环, 做的只是等待而已.
    3. 但线程不断运行的背后, 白白浪费了 cpu 资源.
    4. 数据复制的过程中, 线程实际上还是阻塞的
  3. 多路复用

    1. 单线程配合 Selector 完成对多个 Channel 可读写事件的监控.
      1. 在非阻塞的基础上引入了事件的概念. 表现在未分配时阻塞, 有事件就放行
      2. 多路复用仅针对网络IO、普通文件IO 没办法利用多路复用
select 何时不阻塞
  • 事件发生时
    • 客服端发起连接: accept
    • client send data, normal close & exception close 都会触发: > read; when data > buffer缓冲区, 会触发多次读取事件
    • channel 可写, 触发 write 事件
    • in linux, nio bug 发生时也不阻塞
  • 调用 selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt
  1. 阻塞

  2. 非阻塞

  3. 多路复用

  4. 信号驱动

以上都是同步: 线程自己去获取结果

  1. 异步非阻塞(异步没有阻塞的) 通过回调来通知后续操作, 推送结果 (至少两个线程)
Netty


1、什么是Netty

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

注意:netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO

2、Netty的优势

如果使用传统NIO,其工作量大,bug 多

  • 需要自己构建协议
  • 解决 TCP 传输问题,如粘包、半包
  • 因为bug的存在,epoll 空轮询导致 CPU 100%

Netty 对 API 进行增强,使之更易用,如

  • FastThreadLocal => ThreadLocal
  • ByteBuf => ByteBuffer
二、案例 1、服务器端代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;


@Slf4j
public class Server {

    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup(2);
        try {
            ServerBootstrap server = new ServerBootstrap();
            server.channel(NioServerSocketChannel.class);
            server.group(boss, worker);
            server.childHandler(new ChannelInitial());
            ChannelFuture future = server.bind(8080).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    static class ChannelInitial extends ChannelInitializer {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
            pipeline.addLast(new HttpServerCodec());
            pipeline.addLast(
                    new SimpleChannelInboundHandler() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg)
                                throws Exception {
                            log.info("uri is {}", msg.uri());
                            DefaultFullHttpResponse response =
                                    new DefaultFullHttpResponse(
                                            msg.protocolVersion(), HttpResponseStatus.OK);
                            byte[] bytes = "Hello world/~".getBytes();
                            response.headers().setInt(CONTENT_LENGTH, bytes.length);
                            response.content().writeBytes(bytes);
                            ctx.writeAndFlush(response);
                        }
                    });
        }
    }
}
2、客户端代码
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;


@Slf4j
public class Client {

    public static void main(String[] args) {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap();
            client.channel(NioSocketChannel.class);
            client.group(worker);
            client.handler(new ChannelInitial());
            ChannelFuture future = client.connect("localhost", 8080).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
        }
    }

    static class ChannelInitial extends ChannelInitializer {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new LoggingHandler());
            pipeline.addLast(
                    new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            byte[] LINE = {'n', 'r'};
                            ByteBuf buffer = ctx.alloc().buffer(); // allocate 分配 调拨
                            buffer.writeBytes("*3".getBytes()).writeBytes(LINE);
                            buffer.writeBytes("$3".getBytes()).writeBytes(LINE);
                            buffer.writeBytes("set".getBytes()).writeBytes(LINE);
                            buffer.writeBytes("$4".getBytes()).writeBytes(LINE);
                            buffer.writeBytes("name".getBytes()).writeBytes(LINE);
                            buffer.writeBytes("$7".getBytes()).writeBytes(LINE);
                            buffer.writeBytes("howeres".getBytes()).writeBytes(LINE);
                            ctx.writeAndFlush(buffer);
                            super.channelActive(ctx);
                        }

                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg)
                                throws Exception {
                            ByteBuf buf = (ByteBuf) msg;
                            log.info("msg: {}", buf.toString(StandardCharsets.UTF_8));
                            super.channelRead(ctx, msg);
                        }
                    });
        }
    }
}
Reference

https://www.alibabacloud.com/blog/essential-technologies-for-java-developers-io-and-netty_597367
https://alibaba-cloud.medium.com/essential-technologies-for-java-developers-i-o-and-netty-ec765676fd21

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/951289.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号