async-file工具提供Java异步读写文件能力,使用Java NIO 库开发。Java应用程序引入框架可以简单的,异步和非阻塞的读写文件。框架包含三个工具类:
- AIOFileReader:异步读取文件,使用Java NIO库 AsynchronousFileChannel 和 CompletionHandler 实现。
- AIOFileWriter:异步写入文件,使用Java NIO库 AsynchronousFileChannel 和 CompletionHandler 实现。
- NIOFileLineReader:非阻塞读取文件,使用 ForkJoinPool 和 BufferedReader 实现
提示:Java提供的 Files 文件读取功能是阻塞的。
安装教程首先,如果项目使用Maven工具,在项目的pom.xml文件中添加依赖
io.github.kavahub kava-async-file1.0.0.RELEASE
如果是Gradle项目,需要添加依赖:
AIOFileReader使用说明implementation 'io.github.kavahub:kava-async-file:1.0.0.RELEASE'
AIOFileReader方法列表:
- Query
bytes(Path file) : 读取文件,返回文件数据字节数组,读取的大小有默认缓冲区决定。 - Query
allBytes(Path file) : 读取文件,返回文件所有数据字节数组。每次按默认缓冲区读取文件,完成后合并。 - Query
line(Path file) : 读取文件,返回文件行字符串。每次按默认缓冲区读取文件数据字节数组,按换行符分割字节数组。 - Query
allLines(Path file) : 读取文件,返回文件所有数据字符串。每次按默认缓冲区读取文件数据字节数组,合并后转换成字符串。
提示:默认缓冲区大小定义
public static final int BUFFER_SIZE = 4096 * 4;
示例:
// 按行读取文件,并输出到控制台
final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");
AIOFileReader.line(FILE).subscribe((data, err) -> {
if (err != null) {
// 处理异常,如记录日志
err.printStackTrace();
}
if (data != null) {
// 文件行处理,如输出到控制台
System.out.println(data);
}
})
// 等待所有行处理完成
.join();
示例:
// 统计文件中单词个数,并找出次数最多的单词
final Path FILE = Paths.get("src", "test", "resources", "fileToCount.txt");
final int MIN = 5;
final int MAX = 10;
ConcurrentHashMap words = new ConcurrentHashMap<>();
AIOFileReader.line(FILE)
// 过滤掉前14行
.filter(line -> !line.trim().isEmpty()).skip(14)
// 使用空格分隔
.flatMapMerge(line -> Query.of(line.split(" ")))
// 过滤单词
.filter(word -> word.length() > MIN && word.length() < MAX)
// 统计单词次数
.onNext((w, err) -> words.merge(w, 1, Integer::sum))
// 阻塞,直到文件统计完毕
.blockingSubscribe();
Map.Entry common = Collections.max(words.entrySet(),
Comparator.comparingInt(e -> e.getValue().intValue()));
assertEquals("Hokosa", common.getKey());
assertEquals(183, common.getValue().intValue());
示例:
// 统计“*** END OF ”行之前所有单词的数量
// 当读取到"*** END OF "行时,读线程会取消读操作,避免继续读取不需要处理的数据
final Path FILE = Paths.get("src", "test", "resources", "fileToCount.txt");
int[] count = { 0 };
AIOFileReader.line(FILE)
// 过滤空行
.filter(line -> !line.trim().isEmpty())
// 忽略前14行
.skip(14)
// 忽略掉‘*** END OF ’以后的行
.takeWhile(line -> !line.contains("*** END OF "))
// 行按空格切割成单词
.flatMapMerge(line -> Query.of(line.split("\W+")))
// 去重
.distinct()
// 统计数量
.onNext((word, err) -> {
if (err == null)
count[0]++;
})
// 显示处理中的异常
.onNext((word, err) -> {
if (err != null)
err.printStackTrace();
})
// 阻塞,知道文件读取完成
.blockingSubscribe();
assertEquals(5206, count[0]);
示例:
// 详细演示takeWhile的功能:
// 1. 控制台输出前部文件内容,框架日志提示[Cancel file reading. [16384 bytes] has been readed],读取操作取消,不在读取文件数据。
// 2. [16384 bytes] 信息中,16384是框架默认读取缓冲区大小,由此可以判断:文件只读取了一次
final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");
int[] count = { 0 };
AIOFileReader.line(FILE)
// 控制台输出
.onNext((data, err) -> {
if (err != null) {
err.printStackTrace();
}
if (data != null) {
System.out.println("before:" + data);
}
})
// 终止文件读操纵。
.takeWhile(line -> false)
.onNext((data, err) -> {
if (err != null) {
err.printStackTrace();
}
if (data != null) {
System.out.println("after:" +data);
}
}).blockingSubscribe();
assertEquals(0, count[0]);
示例:
// 也可以使用cancel方法中断读文件操作
final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");
CompletableFuture future = AIOFileReader.line(FILE).subscribe((data, err) -> {
if (err != null) {
System.out.println("error:" + err.getMessage());
}
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
TimeUnit.MILLISECONDS.sleep(1000);
future.cancel(false);
示例:
// 显示读文件线程的名称
final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");
AIOFileReader.bytes(FILE).subscribe((data, err) -> {
if (err != null) {
err.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}).join();
输出结果如下:
Thread-8
Thread-7
Thread-8
Thread-7
Thread-8
Thread-7
Thread-8
Thread-7
Thread-8
Thread-7
...
其结果表明:有两个线程读取文件,线程交替读取以保证读取文件数据的顺序,这是 AsynchronousFileChannel 实现的
AIOFileWriter使用说明AIOFileWriter方法列表:
- CompletableFuture
write(Path file, byte[] bytes) : 字节数组数据写入文件 - CompletableFuture
write(Path file, String line) : 字符串数据写入文件 - CompletableFuture
write(Path file, Query lines) : 字符串流数据写入文件。 - CompletableFuture
write(Path file, Iterable lines) : 字符串集合数据写入文件。
示例:
// 写入字符串 AIOFileWriter.write(Paths.get(FILE_TO_WRITE), "This is file content:你好").join();
示例:
// 分割字符串写入
final String content = "This is file content:你好";
AIOFileWriter.write(Paths.get(FILE_TO_WRITE),
String.join(System.lineSeparator(), content.split(" ")))
.join();
示例:
// 字符流写入
Query data = Query.of("This is file content:你好")
.flatMapMerge(line -> Query.of(line.split(" ")))
.map((line) -> line + System.lineSeparator());
AIOFileWriter.write(Paths.get(FILE_TO_WRITE), data).join();
示例:
// 字符流转换后写入
Query data = Query.of("This is file content:你好")
.flatMapMerge(line -> Query.of(line.split(" ")))
.map((line) -> line + System.lineSeparator());
AIOFileWriter.write(Paths.get(FILE_TO_WRITE), data).join();
示例:
// 边读边写
final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");
Query reader = AIOFileReader.line(FILE)
// 忽略前2行
.skip(2)
// 过滤掉空行
.filter(line -> !line.isBlank())
// 转换成大写
.map(String::toUpperCase)
// 加入换行符
.map((line) -> line + System.lineSeparator());
AIOFileWriter.write(Paths.get(FILE_TO_WRITE), reader).join();
NIOFileLineReader使用说明
NIOFileLineReader 方法列表:
- Query
read(Path file) : 读取文件行。
示例:
// 读取文件行并过滤
final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt");
NIOFileLineReader.read(FILE).filter(line -> !line.trim().isEmpty()).onNext((data, err) -> {
System.out.println(data);
}).blockingSubscribe();
使用建议
- 文件的异步读写,并不是为了提高文件的读取性能,而是提高文件读取的吞吐量(读取更多的文件,并保持性能,使JVM可以稳定运行)。
- 在大多数情况下,使用Jdk提供的Files或许更合适。
- 不要为了异步而异步,找到问题所在,也许解决问题的关键不是异步。
建议使用优先级: Java NIO Files > NIOFileLineReader > AIOFileReader
最后,项目地址在这里:async-file
希望这对你有帮助。



