经常有需求读取一个文件用多线程处理,将处理结果保存到另外一个文件,如果等待全部处理完成再保存容易内存溢出,所以写了一个工具类用单线程写入文件。
使用方法
String inputPath = "/data/test.txt";
String outputPath = "/data/out.txt";
int valid = ReadWriteTaskUtil.startCompute(inputPath, outputPath, 30,
line -> Arrays.asList(line), line -> Arrays.asList(String.format("%sn", line)));
System.out.println(valid);
ReadWriteTaskUtil代码如下:
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.linkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@Slf4j
public class ReadWriteTaskUtil {
public static int startCompute(String inputPath, String outputPath, int threadNum, Function> computeFunction, Function> writeFunction) {
try (FileInputStream inputStream = new FileInputStream(inputPath)) {
return startCompute(inputStream, outputPath, threadNum, computeFunction, writeFunction);
} catch (Exception ex) {
log.error("computeError", ex);
}
return -1;
}
public static int startCompute(InputStream is, String outputPath, int threadNum, Function> computeFunction, Function> writeFunction) {
linkedBlockingQueue queue = new linkedBlockingQueue<>(10000);
ExecutorService writeExecutor = ThreadUtil.getBlockThreadPoolExecutor(1, "MutliReadSingleWriteTaskWriteThread");
writeExecutor.execute(new FileWrite(outputPath, queue, writeFunction));
ExecutorService executorService = ThreadUtil.getBlockThreadPoolExecutor(threadNum, "MutliReadSingleWriteTasKReadThread");
int total = 0;
AtomicInteger valid = new AtomicInteger();
try (
Reader r = new InputStreamReader(is);
BufferedReader reader = new BufferedReader(r)
) {
String line;
while ((line = reader.readLine()) != null) {
if (StringUtils.isBlank(line)) {
continue;
}
executorService.execute(new Compute(line, valid, queue, computeFunction));
total += 1;
if (total % 1000 == 0) {
log.info(String.format("read %d lines, valid %d", total, valid));
}
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
try {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
try {
writeExecutor.shutdownNow();
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return valid.get();
}
@AllArgsConstructor
private static class Compute implements Runnable {
private String line;
private AtomicInteger valid;
private linkedBlockingQueue queue;
private Function> function;
@Override
public void run() {
try {
List result = function.apply(line);
for (T t : result) {
queue.put(t);
}
valid.addAndGet(result.size());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
@AllArgsConstructor
private static class FileWrite implements Runnable {
private String path;
private linkedBlockingQueue queue;
private Function> function;
@Override
public void run() {
try (BufferedWriter writer = new BufferedWriter(new FileWriter(path))) {
log.info("write result");
T result = null;
boolean interrrupt = false;
while (true) {
try {
result = queue.poll(10, TimeUnit.SECONDS);
if (null == result && interrrupt) {
break;
}
if (null == result) {
continue;
}
List resultList = function.apply(result);
for (String str : resultList) {
writer.write(str);
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
interrrupt = true;
}
log.error(e.getMessage(), e);
}
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
public static void main(String[] args) {
String inputPath = "/data/test.txt";
String outputPath = "/data/out.txt";
int valid = startCompute(inputPath, outputPath, 30,
line -> Arrays.asList(line), line -> Arrays.asList(String.format("%sn", line)));
System.out.println(valid);
}
}
ThreadUtil代码如下
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ThreadUtil {
public static ExecutorService getBlockThreadPoolExecutor(int poolSize, String threadName) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().
setNameFormat(threadName + "-thread-%d")
.setUncaughtExceptionHandler(new ThreadUtil.DefaultUncaughtExceptionHandler())
.build();
return new ThreadPoolExecutor(poolSize, poolSize, 0,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(50),
threadFactory, new ThreadUtil.BlockHandler());
}
static class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("uncaught exception occurred in {}", t.getName(), e);
}
}
static class BlockHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
//put阻塞方法
executor.getQueue().put(r);
} catch (Exception e) {
log.error("线程中断", e);
}
}
}
}
}



