栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

多线程读单线程写文件工具类ReadWriteTaskUtil

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

多线程读单线程写文件工具类ReadWriteTaskUtil

背景

经常有需求读取一个文件用多线程处理,将处理结果保存到另外一个文件,如果等待全部处理完成再保存容易内存溢出,所以写了一个工具类用单线程写入文件。
使用方法

        String inputPath = "/data/test.txt";
        String outputPath = "/data/out.txt";
        int valid = ReadWriteTaskUtil.startCompute(inputPath, outputPath, 30,
                line -> Arrays.asList(line), line -> Arrays.asList(String.format("%sn", line)));
        System.out.println(valid);

ReadWriteTaskUtil代码如下:

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.linkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;


@Slf4j
public class ReadWriteTaskUtil {

    
    public static  int startCompute(String inputPath, String outputPath, int threadNum, Function> computeFunction, Function> writeFunction) {
        try (FileInputStream inputStream = new FileInputStream(inputPath)) {
            return startCompute(inputStream, outputPath, threadNum, computeFunction, writeFunction);
        } catch (Exception ex) {
            log.error("computeError", ex);
        }
        return -1;
    }

    
    public static  int startCompute(InputStream is, String outputPath, int threadNum, Function> computeFunction, Function> writeFunction) {
        linkedBlockingQueue queue = new linkedBlockingQueue<>(10000);
        ExecutorService writeExecutor = ThreadUtil.getBlockThreadPoolExecutor(1, "MutliReadSingleWriteTaskWriteThread");
        writeExecutor.execute(new FileWrite(outputPath, queue, writeFunction));

        ExecutorService executorService = ThreadUtil.getBlockThreadPoolExecutor(threadNum, "MutliReadSingleWriteTasKReadThread");
        int total = 0;
        AtomicInteger valid = new AtomicInteger();
        try (
                Reader r = new InputStreamReader(is);
                BufferedReader reader = new BufferedReader(r)
        ) {
            String line;
            while ((line = reader.readLine()) != null) {
                if (StringUtils.isBlank(line)) {
                    continue;
                }
                executorService.execute(new Compute(line, valid, queue, computeFunction));
                total += 1;
                if (total % 1000 == 0) {
                    log.info(String.format("read %d lines, valid %d", total, valid));
                }
            }
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }

        try {
            executorService.shutdown();
            executorService.awaitTermination(1, TimeUnit.DAYS);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }

        try {
            writeExecutor.shutdownNow();
            executorService.awaitTermination(1, TimeUnit.DAYS);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return valid.get();
    }

    @AllArgsConstructor
    private static class Compute implements Runnable {
        private String line;
        private AtomicInteger valid;
        private linkedBlockingQueue queue;
        private Function> function;

        @Override
        public void run() {
            try {
                List result = function.apply(line);
                for (T t : result) {
                    queue.put(t);
                }
                valid.addAndGet(result.size());
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
    }

    @AllArgsConstructor
    private static class FileWrite implements Runnable {
        private String path;
        private linkedBlockingQueue queue;
        private Function> function;

        @Override
        public void run() {
            try (BufferedWriter writer = new BufferedWriter(new FileWriter(path))) {
                log.info("write result");
                T result = null;
                boolean interrrupt = false;
                while (true) {
                    try {
                        result = queue.poll(10, TimeUnit.SECONDS);
                        if (null == result && interrrupt) {
                            break;
                        }
                        if (null == result) {
                            continue;
                        }
                        List resultList = function.apply(result);
                        for (String str : resultList) {
                            writer.write(str);
                        }
                    } catch (Exception e) {
                        if (e instanceof InterruptedException) {
                            interrrupt = true;
                        }
                        log.error(e.getMessage(), e);
                    }
                }
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
        }
    }

    public static void main(String[] args) {
        String inputPath = "/data/test.txt";
        String outputPath = "/data/out.txt";
        int valid = startCompute(inputPath, outputPath, 30,
                line -> Arrays.asList(line), line -> Arrays.asList(String.format("%sn", line)));
        System.out.println(valid);
    }
}

ThreadUtil代码如下

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;



@Slf4j
public class ThreadUtil {
    public static ExecutorService getBlockThreadPoolExecutor(int poolSize, String threadName) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().
                setNameFormat(threadName + "-thread-%d")
                .setUncaughtExceptionHandler(new ThreadUtil.DefaultUncaughtExceptionHandler())
                .build();
        return new ThreadPoolExecutor(poolSize, poolSize, 0,
                TimeUnit.MINUTES, new ArrayBlockingQueue<>(50),
                threadFactory, new ThreadUtil.BlockHandler());
    }

    static class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            log.error("uncaught exception occurred in {}", t.getName(), e);
        }
    }

    static class BlockHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (!executor.isShutdown()) {
                try {
                    //put阻塞方法
                    executor.getQueue().put(r);
                } catch (Exception e) {
                    log.error("线程中断", e);
                }
            }
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/308823.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号