- client
通过 java.net.Socket 来获取流
import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class SocketClient {
public static void main(String[] args) throws IOException {
Socket sock = new Socket("localhost", 6666); // 连接指定服务器和端口
try (InputStream input = sock.getInputStream();
OutputStream output = sock.getOutputStream()) {
handle(input, output);
}
sock.close();
System.out.println("disconnected.");
}
private static void handle(InputStream input, OutputStream output) throws IOException {
BufferedWriter writer =
new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8));
BufferedReader reader =
new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
Scanner scanner = new Scanner(System.in);
System.out.println("[server] " + reader.readLine());
for (; ; ) {
System.out.print(">>> "); // 打印提示
String s = scanner.nextLine(); // 读取一行输入
writer.write(s);
writer.newLine();
writer.flush();
String resp = reader.readLine();
System.out.println("<<< " + resp);
if (resp.equals("bye")) {
break;
}
}
}
}
- server
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
@SuppressWarnings("all")
public class SocketServer {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(6666); // 监听指定端口
System.out.println("server is running...");
for (; ; ) {
Socket sock = ss.accept();
System.out.println("connected from " + sock.getRemoteSocketAddress());
Thread t = new Handler(sock);
t.start();
}
}
static class Handler extends Thread {
Socket sock;
public Handler(Socket sock) {
this.sock = sock;
}
@Override
public void run() {
try (InputStream input = this.sock.getInputStream()) {
try (OutputStream output = this.sock.getOutputStream()) {
handle(input, output);
}
} catch (Exception e) {
try {
this.sock.close();
} catch (IOException ignored) {
}
}
System.out.println("client disconnected.");
}
private void handle(InputStream input, OutputStream output) throws IOException {
BufferedWriter writer =
new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8));
BufferedReader reader =
new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
writer.write("hellon");
writer.flush();
for (; ; ) {
String s = reader.readLine();
if (s.equals("bye")) {
writer.write("byen");
writer.flush();
break;
}
writer.write("ok: " + s + "n");
writer.flush();
}
}
}
}
Buffer
@Slf4j
public class TestFileChannelTransferTo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
try (final FileChannel from = new FileInputStream("from.txt").getChannel();
final FileChannel to = new FileOutputStream("to.txt").getChannel()) {
// 剩余 未传输的 字节数
long size = from.size();
// Question: 上限:最大一次传输 2G 数据,超出不会被传输
// from.transferTo(0, from.size(), to); // before: FileChannel#transferTo
for (long remainder = size; remainder > 0; ) { // after: use for when bigger 2 GB
long position = size - remainder; // 全部大小 - 剩余字节树
System.out.println("position: " + position + " remainder:" + remainder);
// 比 输入输出流 效率高,底层使用操作系统的 零拷贝 进行优化
remainder -= from.transferTo(position, remainder, to); // transferTo 返回实际传输的字节数
}
} catch (IOException e) {
e.printStackTrace();
}
log.info("---time cost: " + (System.currentTimeMillis() - start) + " 毫秒---");
}
@Test
public void traverse() throws IOException {
// 这里不能使用 局部变量 int, 因为 内部类使用局部变量 必须都是 final 的,java1.8后可以不写final
final AtomicInteger dirCount = new AtomicInteger();
final AtomicInteger fileCount = new AtomicInteger();
// SimpleFileVisitor uses the visitor pattern.
Files.walkFileTree(
Paths.get("E:\aio1"),
new SimpleFileVisitor() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
System.out.println(">>> 进入 目录 " + dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
// if (file.toString().endsWith(".jar")) // 加判断
System.out.println(file);
// Files.delete(file); // =================== 删除文件
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc)
throws IOException {
// Files.delete(dir); // =================== 删除文件夹
System.out.println("<<< 退出 目录 " + dir);
return super.postVisitDirectory(dir, exc);
}
});
System.out.println("文件夹:" + dirCount);
System.out.println("文件数:" + fileCount);
}
@Test
public void testCopyDirAll() throws IOException {
String source = "E:\aio1";
String target = "E:\aio2";
// 同样也是遍历
try (Stream traverseStream = Files.walk(Paths.get(source))) {
traverseStream.forEach(
path -> {
String targetName = path.toString().replace(source, target);
try {
Path currentPath = Paths.get(targetName);
if (Files.isDirectory(path)) { // 如果是 目录 创建
Files.createDirectories(currentPath);
} else if (Files.isRegularFile(path)) { // 如果是 文件 复制
Files.copy(path, currentPath);
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
}
阻塞vs非阻塞
-
阻塞:
- 单线程下, 阻塞方法相互影响, 不能正常工作
- 多线程下, 线程数多了, cpu 占用高, 上下文切换成本高; 线程池虽然减少了线程数但只能用短连接. 都是治标不治本;
-
非阻塞:
- 在 ServerSocketChannel.accept 没有连接建立时返回 null , 继续运行
- SocketChannel.read 在没有数据可读时, 会返回 0, 会一直循环, 做的只是等待而已.
- 但线程不断运行的背后, 白白浪费了 cpu 资源.
- 数据复制的过程中, 线程实际上还是阻塞的
-
多路复用
- 单线程配合 Selector 完成对多个 Channel 可读写事件的监控.
- 在非阻塞的基础上引入了事件的概念. 表现在未分配时阻塞, 有事件就放行
- 多路复用仅针对网络IO、普通文件IO 没办法利用多路复用
- 单线程配合 Selector 完成对多个 Channel 可读写事件的监控.
select 何时不阻塞
- 事件发生时
- 客服端发起连接: accept
- client send data, normal close & exception close 都会触发: > read; when data > buffer缓冲区, 会触发多次读取事件
- channel 可写, 触发 write 事件
- in linux, nio bug 发生时也不阻塞
- 调用 selector.wakeup()
- 调用 selector.close()
- selector 所在线程 interrupt
-
阻塞
-
非阻塞
-
多路复用
-
信号驱动
以上都是同步: 线程自己去获取结果
- 异步非阻塞(异步没有阻塞的) 通过回调来通知后续操作, 推送结果 (至少两个线程)
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
注意:netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO
2、Netty的优势如果使用传统NIO,其工作量大,bug 多
- 需要自己构建协议
- 解决 TCP 传输问题,如粘包、半包
- 因为bug的存在,epoll 空轮询导致 CPU 100%
Netty 对 API 进行增强,使之更易用,如
- FastThreadLocal => ThreadLocal
- ByteBuf => ByteBuffer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
@Slf4j
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup(2);
try {
ServerBootstrap server = new ServerBootstrap();
server.channel(NioServerSocketChannel.class);
server.group(boss, worker);
server.childHandler(new ChannelInitial());
ChannelFuture future = server.bind(8080).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
static class ChannelInitial extends ChannelInitializer {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(
new SimpleChannelInboundHandler() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg)
throws Exception {
log.info("uri is {}", msg.uri());
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "Hello world/~".getBytes();
response.headers().setInt(CONTENT_LENGTH, bytes.length);
response.content().writeBytes(bytes);
ctx.writeAndFlush(response);
}
});
}
}
}
2、客户端代码
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Client {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap client = new Bootstrap();
client.channel(NioSocketChannel.class);
client.group(worker);
client.handler(new ChannelInitial());
ChannelFuture future = client.connect("localhost", 8080).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
static class ChannelInitial extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler());
pipeline.addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
byte[] LINE = {'n', 'r'};
ByteBuf buffer = ctx.alloc().buffer(); // allocate 分配 调拨
buffer.writeBytes("*3".getBytes()).writeBytes(LINE);
buffer.writeBytes("$3".getBytes()).writeBytes(LINE);
buffer.writeBytes("set".getBytes()).writeBytes(LINE);
buffer.writeBytes("$4".getBytes()).writeBytes(LINE);
buffer.writeBytes("name".getBytes()).writeBytes(LINE);
buffer.writeBytes("$7".getBytes()).writeBytes(LINE);
buffer.writeBytes("howeres".getBytes()).writeBytes(LINE);
ctx.writeAndFlush(buffer);
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info("msg: {}", buf.toString(StandardCharsets.UTF_8));
super.channelRead(ctx, msg);
}
});
}
}
}
Reference
https://www.alibabacloud.com/blog/essential-technologies-for-java-developers-io-and-netty_597367
https://alibaba-cloud.medium.com/essential-technologies-for-java-developers-i-o-and-netty-ec765676fd21



