栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java使用多线程读取超大文件

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java使用多线程读取超大文件

接上次写的“JAVA读取超大文件”。在读取超过10G的文件时会发现一次读一行的速度实在是不能接受,想到使用多线程+FileChannel来做一个使用多线程版本。

基本思路如下:

1.计算出文件总大小

2.分段处理,计算出每个线程读取文件的开始与结束位置

  (文件大小/线程数)*N,N是指第几个线程,这样能得到每个线程在读该文件的大概起始位置

使用"大概起始位置",作为读文件的开始偏移量(fileChannel.position("大概起始位置")),来读取该文件,直到读到第一个换行符,记录下这个换行符的位置,作为该线程的准确起 始位置.同时它也是上一个线程的结束位置.最后一个线程的结束位置也直接设置为-1

3.启动线程,每个线程从开始位置读取到结束位置为止

代码如下:

读文件工具类

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Observable;
 

public class ReadFile extends Observable {
 
 private int bufSize = 1024;
 // 换行符
 private byte key = "n".getBytes()[0];
 // 当前行数
 private long lineNum = 0;
 // 文件编码,默认为gb2312
 private String encode = "gb2312";
 // 具体业务逻辑监听器
 private ReaderFileListener readerListener;
 
 public void setEncode(String encode) {
 this.encode = encode;
 }
 
 public void setReaderListener(ReaderFileListener readerListener) {
 this.readerListener = readerListener;
 }
 
 
 public long getStartNum(File file, long position) throws Exception {
 long startNum = position;
 FileChannel fcin = new RandomAccessFile(file, "r").getChannel();
 fcin.position(position);
 try {
  int cache = 1024;
  ByteBuffer rBuffer = ByteBuffer.allocate(cache);
  // 每次读取的内容
  byte[] bs = new byte[cache];
  // 缓存
  byte[] tempBs = new byte[0];
  String line = "";
  while (fcin.read(rBuffer) != -1) {
  int rSize = rBuffer.position();
  rBuffer.rewind();
  rBuffer.get(bs);
  rBuffer.clear();
  byte[] newStrByte = bs;
  // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
  if (null != tempBs) {
   int tL = tempBs.length;
   newStrByte = new byte[rSize + tL];
   System.arraycopy(tempBs, 0, newStrByte, 0, tL);
   System.arraycopy(bs, 0, newStrByte, tL, rSize);
  }
  // 获取开始位置之后的第一个换行符
  int endIndex = indexOf(newStrByte, 0);
  if (endIndex != -1) {
   return startNum + endIndex;
  }
  tempBs = substring(newStrByte, 0, newStrByte.length);
  startNum += 1024;
  }
 } catch (Exception e) {
  e.printStackTrace();
 } finally {
  fcin.close();
 }
 return position;
 }
 
 
 public void readFileByLine(String fullPath, long start, long end) throws Exception {
 File fin = new File(fullPath);
 if (fin.exists()) {
  FileChannel fcin = new RandomAccessFile(fin, "r").getChannel();
  fcin.position(start);
  try {
  ByteBuffer rBuffer = ByteBuffer.allocate(bufSize);
  // 每次读取的内容
  byte[] bs = new byte[bufSize];
  // 缓存
  byte[] tempBs = new byte[0];
  String line = "";
  // 当前读取文件位置
  long nowCur = start;
  while (fcin.read(rBuffer) != -1) {
   nowCur += bufSize;
 
   int rSize = rBuffer.position();
   rBuffer.rewind();
   rBuffer.get(bs);
   rBuffer.clear();
   byte[] newStrByte = bs;
   // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
   if (null != tempBs) {
   int tL = tempBs.length;
   newStrByte = new byte[rSize + tL];
   System.arraycopy(tempBs, 0, newStrByte, 0, tL);
   System.arraycopy(bs, 0, newStrByte, tL, rSize);
   }
   // 是否已经读到最后一位
   boolean isEnd = false;
   // 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置
   if (end > 0 && nowCur > end) {
   // 缓存长度 - 当前已经读取位数 - 最后位数
   int l = newStrByte.length - (int) (nowCur - end);
   newStrByte = substring(newStrByte, 0, l);
   isEnd = true;
   }
   int fromIndex = 0;
   int endIndex = 0;
   // 每次读一行内容,以 key(默认为n) 作为结束符
   while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) {
   byte[] bLine = substring(newStrByte, fromIndex, endIndex);
   line = new String(bLine, 0, bLine.length, encode);
   lineNum++;
   // 输出一行内容,处理方式由调用方提供
   readerListener.outLine(line.trim(), lineNum, false);
   fromIndex = endIndex + 1;
   }
   // 将未读取完成的内容放到缓存中
   tempBs = substring(newStrByte, fromIndex, newStrByte.length);
   if (isEnd) {
   break;
   }
  }
  // 将剩下的最后内容作为一行,输出,并指明这是最后一行
  String lineStr = new String(tempBs, 0, tempBs.length, encode);
  readerListener.outLine(lineStr.trim(), lineNum, true);
  } catch (Exception e) {
  e.printStackTrace();
  } finally {
  fcin.close();
  }
 
 } else {
  throw new FileNotFoundException("没有找到文件:" + fullPath);
 }
 // 通知观察者,当前工作已经完成
 setChanged();
 notifyObservers(start+"-"+end);
 }
 
 
 private int indexOf(byte[] src, int fromIndex) throws Exception {
 
 for (int i = fromIndex; i < src.length; i++) {
  if (src[i] == key) {
  return i;
  }
 }
 return -1;
 }
 
 
 private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception {
 int size = endIndex - fromIndex;
 byte[] ret = new byte[size];
 System.arraycopy(src, fromIndex, ret, 0, size);
 return ret;
 }
 
}

读文件线程


public class ReadFileThread extends Thread {
 
 private ReaderFileListener processPoiDataListeners;
 private String filePath;
 private long start;
 private long end;
 
 public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) {
 this.setName(this.getName()+"-ReadFileThread");
 this.start = start;
 this.end = end;
 this.filePath = file;
 this.processPoiDataListeners = processPoiDataListeners;
 }
 
 @Override
 public void run() {
 ReadFile readFile = new ReadFile();
 readFile.setReaderListener(processPoiDataListeners);
 readFile.setEncode(processPoiDataListeners.getEncode());
// readFile.addObserver();
 try {
  readFile.readFileByLine(filePath, start, end + 1);
 } catch (Exception e) {
  e.printStackTrace();
 }
 }
}

具体业务逻辑监听


public abstract class ReaderFileListener {
 
 // 一次读取行数,默认为500
 private int readColNum = 500;
 
 private String encode;
 
 private List list = new ArrayList();
 
 
 protected void setReadColNum(int readColNum) {
 this.readColNum = readColNum;
 }
 
 public String getEncode() {
 return encode;
 }
 
 public void setEncode(String encode) {
 this.encode = encode;
 }
 
 
 public void outLine(String lineStr, long lineNum, boolean over) throws Exception {
 if(null != lineStr)
  list.add(lineStr);
 if (!over && (lineNum % readColNum == 0)) {
  output(list);
  list.clear();
 } else if (over) {
  output(list);
  list.clear();
 }
 }
 
 
 public abstract void output(List stringList) throws Exception;
 
}

线程调度

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
 

public class BuildData {
 public static void main(String[] args) throws Exception {
 File file = new File("E:\1396341974289.csv");
 FileInputStream fis = null;
 try {
  ReadFile readFile = new ReadFile();
  fis = new FileInputStream(file);
  int available = fis.available();
  int maxThreadNum = 50;
  // 线程粗略开始位置
  int i = available / maxThreadNum;
  for (int j = 0; j < maxThreadNum; j++) {
  // 计算精确开始位置
  long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);
  long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2;
  // 具体监听实现
  ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners("gbk");
  new ReadFileThread(listeners, startNum, endNum, file.getPath()).start();
  }
 } catch (IOException e) {
  e.printStackTrace();
 } catch (Exception e) {
  e.printStackTrace();
 }
 }
}

现在就可以尽情的调整 maxThreadNum来享受风一般的速度吧!

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/137171.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号