我通过实现算法来解决这个问题:观察者线程会将文件名放在BlockingQueue中,其他线程将轮询此队列,获取文件名,尝试几次打开文件。如果打开文件,则Windows
Copier已释放文件锁定,我们可以继续进行。因此,当其他线程发现文件已被解锁时,其他线程会将此文件名放入已处理的队列中,我的应用程序将从该文件名中检索文件名。另外,另一个线程在通过打开文件检查文件是否解锁时,如果它长时间运行以解锁文件,我们可以将此文件名放回BlockingQueue中并处理其他文件名,前者可以稍后处理。
解决方案:希望这对其他人有帮助:
package dirwatch;import java.nio.file.*;import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;import static java.nio.file.StandardWatchEventKinds.OVERFLOW;import static java.nio.file.linkOption.*;import java.nio.file.attribute.*;import java.io.*;import java.util.*;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;public class WatchDir { private final WatchService watcher; private final Map<WatchKey,Path> keys; private final boolean recursive; private boolean trace = false; private BlockingQueue<String> fileProcessingQueue; / private void processEvents(){ System.out.println("DirWatcherThread started."); while(!closeWatcherThread) { // wait for key to be signalled WatchKey key; try { key = watcher.take(); } catch (InterruptedException x) { // if we are returning from these method, it means we no longer wants to watch directory // we must close thread which may be waiting for file names in queue continue; }catch(ClosedWatchServiceException cwse){ break; } Path dir = keys.get(key); if (dir == null) { System.err.println("WatchKey not recognized!!"); continue; } try{ for (WatchEvent<?> event: key.pollEvents()) { WatchEvent.Kind kind = event.kind(); if (kind == OVERFLOW) { continue; } // Context for directory entry event is the file name of entry WatchEvent<Path> ev = cast(event); Path name = ev.context(); Path child = dir.resolve(name); if(kind.equals(ENTRY_CREATE)){ // if directory is created, and watching recursively, then // register it and its sub-directories if (recursive) { try { if (Files.isDirectory(child, NOFOLLOW_linkS)) { registerAll(child); continue; } } catch (IOException x) { // ignore to keep sample readbale } } while(true){ if(fileProcessingQueue.remainingCapacity() < 2){ // if only one last can be inserted then don't queue this we need 1 empty space in queue // for swaping file names.. // sleep for some time so processing thread may have made some rooms to queue in fileQueue // this logic will not create any problems as only one this thread is inserting in queue try{ Thread.sleep(200); }catch(InterruptedException ie){ } continue; } if(!fileProcessingQueue.offer(child.toString())){ // couldn't queue this element by whatever reason.. we will try to enqueue again by continuing loop continue; }else{ // file name has been queued in queue break; } } } } // reset key and remove from set if directory no longer accessible boolean valid = key.reset(); if (!valid) { keys.remove(key); // all directories are inaccessible if (keys.isEmpty()) { break; } } }catch(ClosedWatchServiceException cwse){ break; } } closeProcessingThread = true; closeWatcherThread = true; System.out.println("DirWatcherThread exited."); } public void stopWatching(){ try{ watcher.close(); }catch(IOException ioe){ } closeProcessingThread = true; closeWatcherThread = true; } public static WatchDir watchDirectory(String dirName, boolean recursive) throws InvalidPathException, IOException, Exception{ try{ Path dir = Paths.get(dirName); final WatchDir watchDir = new WatchDir(dir, recursive); watchDir.closeProcessingThread = false; watchDir.closeWatcherThread = false; new Thread(new Runnable() { public void run() { watchDir.processFiles(); } }, "DirWatchProcessingThread").start(); new Thread(new Runnable() { public void run() { watchDir.processEvents(); } }, "DirWatcherThread").start(); return watchDir; }catch(InvalidPathException ipe){ throw ipe; }catch(IOException ioe){ throw ioe; }catch(Exception e){ throw e; } } @SuppressWarnings("unchecked") private static <T> WatchEvent<T> cast(WatchEvent<?> event) { return (WatchEvent<T>)event; } private void register(Path dir) throws IOException { //WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); WatchKey key = dir.register(watcher, ENTRY_CREATE); if (trace) { Path prev = keys.get(key); if (prev == null) { System.out.format("register: %sn", dir); } else { if (!dir.equals(prev)) { System.out.format("update: %s -> %sn", prev, dir); } } } keys.put(key, dir); } private void registerAll(final Path start) throws IOException { // register directory and sub-directories Files.walkFileTree(start, new SimpleFileVisitor<Path>() { @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { register(dir); return FileVisitResult.CONTINUE; } }); } private WatchDir(Path dir, boolean recursive) throws IOException { fileProcessingQueue = new ArrayBlockingQueue<String>(WatchDirParameters.fileQueueSize, false); processedFileQueue = new ArrayBlockingQueue<String>(WatchDirParameters.fileQueueSize, false); this.watcher = FileSystems.getDefault().newWatchService(); this.keys = new HashMap<WatchKey,Path>(); this.recursive = recursive; //CreateTxtFile.createFile(dir, 1); if (recursive) { System.out.format("Scanning %s ...n", dir); registerAll(dir); System.out.println("Done."); } else { register(dir); } // enable trace after initial registration this.trace = true; }}参数类别:
package dirwatch;public class WatchDirParameters { public static final int millisToPuaseForFileLock = 200; public static final int fileQueueSize = 500; public static final int millisToSwapFileForUnlocking = 2000;}


