近实时的介绍请看附录1。
lucene文件在finishCommit中被sync到磁盘
final String finishCommit(Directory dir) throws IOException {
if (pendingCommit == false) {
throw new IllegalStateException("prepareCommit was not called");
}
boolean success = false;
final String dest;
try {
final String src = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, "", generation);
dest = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", generation);
dir.rename(src, dest);
dir.syncmetaData();
success = true;
} finally {
if (!success) {
// deletes pending_segments_N:
rollbackCommit(dir);
}
}
lucene如何实现近实时
DirectoryReader.open(final IndexWriter indexWriter) DirectoryReader.open(final IndexWriter indexWriter, boolean applyAllDeletes, boolean writeAllDeletes)
以上两种方法luncene使用在内存中的segment信息寻找索引文件,此时的索引文件并没有sync到盘
ES中实现的近实时
@SuppressForbidden(reason = "reference counting is required here") class ElasticsearchReaderManager extends ReferenceManager{ private final BiConsumer refreshListener; ElasticsearchReaderManager(ElasticsearchDirectoryReader reader, BiConsumer refreshListener) { this.current = reader; this.refreshListener = refreshListener; refreshListener.accept(current, null); } @Override protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { reference.decRef(); } @Override protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException { final ElasticsearchDirectoryReader reader = (ElasticsearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh); if (reader != null) { refreshListener.accept(reader, referenceToRefresh); } return reader; } @Override protected boolean tryIncRef(ElasticsearchDirectoryReader reference) { return reference.tryIncRef(); } @Override protected int getRefCount(ElasticsearchDirectoryReader reference) { return reference.getRefCount(); } }
继承自lucene里面的ReferenceManager实现了refreshIfNeeded方法。
public abstract class ReferenceManagerimplements Closeable { private static final String REFERENCE_MANAGER_IS_CLOSED_MSG = "this ReferenceManager is closed"; protected volatile G current; private final Lock refreshLock = new ReentrantLock(); private final List refreshListeners = new CopyOnWriteArrayList<>(); private void ensureOpen() { if (current == null) { throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG); } } private synchronized void swapReference(G newReference) throws IOException { ensureOpen(); final G oldReference = current; current = newReference; release(oldReference); } protected abstract void decRef(G reference) throws IOException; protected abstract G refreshIfNeeded(G referenceToRefresh) throws IOException; protected abstract boolean tryIncRef(G reference) throws IOException; public final G acquire() throws IOException { G ref; do { if ((ref = current) == null) { throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG); } if (tryIncRef(ref)) { return ref; } if (getRefCount(ref) == 0 && current == ref) { assert ref != null; throw new IllegalStateException("The managed reference has already closed - this is likely a bug when the reference count is modified outside of the ReferenceManager"); } } while (true); } @Override public final synchronized void close() throws IOException { if (current != null) { // make sure we can call this more than once // closeable javadoc says: // if this is already closed then invoking this method has no effect. swapReference(null); afterClose(); } } protected abstract int getRefCount(G reference); protected void afterClose() throws IOException { } private void doMaybeRefresh() throws IOException { // it's ok to call lock() here (blocking) because we're supposed to get here // from either maybeRefresh() or maybeRefreshBlocking(), after the lock has // already been obtained. Doing that protects us from an accidental bug // where this method will be called outside the scope of refreshLock. // Per ReentrantLock's javadoc, calling lock() by the same thread more than // once is ok, as long as unlock() is called a matching number of times. refreshLock.lock(); boolean refreshed = false; try { final G reference = acquire(); try { notifyRefreshListenersBefore(); G newReference = refreshIfNeeded(reference); if (newReference != null) { assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed"; try { swapReference(newReference); refreshed = true; } finally { if (!refreshed) { release(newReference); } } } } finally { release(reference); notifyRefreshListenersRefreshed(refreshed); } afterMaybeRefresh(); } finally { refreshLock.unlock(); } } public final boolean maybeRefresh() throws IOException { ensureOpen(); // Ensure only 1 thread does refresh at once; other threads just return immediately: final boolean doTryRefresh = refreshLock.tryLock(); if (doTryRefresh) { try { doMaybeRefresh(); } finally { refreshLock.unlock(); } } return doTryRefresh; } public final void maybeRefreshBlocking() throws IOException { ensureOpen(); // Ensure only 1 thread does refresh at once refreshLock.lock(); try { doMaybeRefresh(); } finally { refreshLock.unlock(); } } protected void afterMaybeRefresh() throws IOException { } public final void release(G reference) throws IOException { assert reference != null; decRef(reference); } private void notifyRefreshListenersBefore() throws IOException { for (RefreshListener refreshListener : refreshListeners) { refreshListener.beforeRefresh(); } } private void notifyRefreshListenersRefreshed(boolean didRefresh) throws IOException { for (RefreshListener refreshListener : refreshListeners) { refreshListener.afterRefresh(didRefresh); } } public void addListener(RefreshListener listener) { if (listener == null) { throw new NullPointerException("Listener must not be null"); } refreshListeners.add(listener); } public void removeListener(RefreshListener listener) { if (listener == null) { throw new NullPointerException("Listener must not be null"); } refreshListeners.remove(listener); } public interface RefreshListener { void beforeRefresh() throws IOException; void afterRefresh(boolean didRefresh) throws IOException; } }
es中flush方法则是调用了shard的syncFlush方法完成commit操作
@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException {
// best effort attempt before we acquire locks
ensureOpen();
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.PENDING_OPERATIONS;
}
if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return SyncedFlushResult.COMMIT_MISMATCH;
}
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
ensureCanFlush();
// lets do a refresh to make sure we shrink the version map. This refresh will be either a no-op (just shrink the version map)
// or we also have uncommitted changes and that causes this syncFlush to fail.
refresh("sync_flush", SearcherScope.INTERNAL, true);
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.PENDING_OPERATIONS;
}
if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return SyncedFlushResult.COMMIT_MISMATCH;
}
logger.trace("starting sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
return SyncedFlushResult.SUCCESS;
} catch (IOException ex) {
maybeFailEngine("sync commit", ex);
throw new EngineException(shardId, "failed to sync commit", ex);
}
}
1. Near real-time search | Elasticsearch Guide [7.9] | Elastic



