栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ES中的近实时

ES中的近实时

近实时的介绍请看附录1。

lucene文件在finishCommit中被sync到磁盘

  final String finishCommit(Directory dir) throws IOException {
    if (pendingCommit == false) {
      throw new IllegalStateException("prepareCommit was not called");
    }
    boolean success = false;
    final String dest;
    try {
      final String src = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, "", generation);
      dest = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", generation);
      dir.rename(src, dest);
      dir.syncmetaData();
      success = true;
    } finally {
      if (!success) {
        // deletes pending_segments_N:
        rollbackCommit(dir);
      }
    }

lucene如何实现近实时

DirectoryReader.open(final IndexWriter indexWriter)

DirectoryReader.open(final IndexWriter indexWriter, boolean applyAllDeletes, boolean writeAllDeletes)

以上两种方法luncene使用在内存中的segment信息寻找索引文件,此时的索引文件并没有sync到盘

ES中实现的近实时

@SuppressForbidden(reason = "reference counting is required here")
class ElasticsearchReaderManager extends ReferenceManager {
    private final BiConsumer refreshListener;

    
    ElasticsearchReaderManager(ElasticsearchDirectoryReader reader,
                               BiConsumer refreshListener) {
        this.current = reader;
        this.refreshListener = refreshListener;
        refreshListener.accept(current, null);
    }

    @Override
    protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
        reference.decRef();
    }

    @Override
    protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
        final ElasticsearchDirectoryReader reader = (ElasticsearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh);
        if (reader != null) {
            refreshListener.accept(reader, referenceToRefresh);
        }
        return reader;
    }

    @Override
    protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
        return reference.tryIncRef();
    }

    @Override
    protected int getRefCount(ElasticsearchDirectoryReader reference) {
        return reference.getRefCount();
    }
}

继承自lucene里面的ReferenceManager实现了refreshIfNeeded方法。

public abstract class ReferenceManager implements Closeable {

  private static final String REFERENCE_MANAGER_IS_CLOSED_MSG = "this ReferenceManager is closed";
  
  protected volatile G current;
  
  private final Lock refreshLock = new ReentrantLock();

  private final List refreshListeners = new CopyOnWriteArrayList<>();

  private void ensureOpen() {
    if (current == null) {
      throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG);
    }
  }
  
  private synchronized void swapReference(G newReference) throws IOException {
    ensureOpen();
    final G oldReference = current;
    current = newReference;
    release(oldReference);
  }

  
  protected abstract void decRef(G reference) throws IOException;
  
  
  protected abstract G refreshIfNeeded(G referenceToRefresh) throws IOException;

  
  protected abstract boolean tryIncRef(G reference) throws IOException;

  
  public final G acquire() throws IOException {
    G ref;

    do {
      if ((ref = current) == null) {
        throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG);
      }
      if (tryIncRef(ref)) {
        return ref;
      }
      if (getRefCount(ref) == 0 && current == ref) {
        assert ref != null;
        
        throw new IllegalStateException("The managed reference has already closed - this is likely a bug when the reference count is modified outside of the ReferenceManager");
      }
    } while (true);
  }
  
  
  @Override
  public final synchronized void close() throws IOException {
    if (current != null) {
      // make sure we can call this more than once
      // closeable javadoc says:
      // if this is already closed then invoking this method has no effect.
      swapReference(null);
      afterClose();
    }
  }

  
  protected abstract int getRefCount(G reference);

  
  protected void afterClose() throws IOException {
  }

  private void doMaybeRefresh() throws IOException {
    // it's ok to call lock() here (blocking) because we're supposed to get here
    // from either maybeRefresh() or maybeRefreshBlocking(), after the lock has
    // already been obtained. Doing that protects us from an accidental bug
    // where this method will be called outside the scope of refreshLock.
    // Per ReentrantLock's javadoc, calling lock() by the same thread more than
    // once is ok, as long as unlock() is called a matching number of times.
    refreshLock.lock();
    boolean refreshed = false;
    try {
      final G reference = acquire();
      try {
        notifyRefreshListenersBefore();
        G newReference = refreshIfNeeded(reference);
        if (newReference != null) {
          assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
          try {
            swapReference(newReference);
            refreshed = true;
          } finally {
            if (!refreshed) {
              release(newReference);
            }
          }
        }
      } finally {
        release(reference);
        notifyRefreshListenersRefreshed(refreshed);
      }
      afterMaybeRefresh();
    } finally {
      refreshLock.unlock();
    }
  }

  
  public final boolean maybeRefresh() throws IOException {
    ensureOpen();

    // Ensure only 1 thread does refresh at once; other threads just return immediately:
    final boolean doTryRefresh = refreshLock.tryLock();
    if (doTryRefresh) {
      try {
        doMaybeRefresh();
      } finally {
        refreshLock.unlock();
      }
    }

    return doTryRefresh;
  }
  
  
  public final void maybeRefreshBlocking() throws IOException {
    ensureOpen();

    // Ensure only 1 thread does refresh at once
    refreshLock.lock();
    try {
      doMaybeRefresh();
    } finally {
      refreshLock.unlock();
    }
  }

  
  protected void afterMaybeRefresh() throws IOException {
  }
  
  
  public final void release(G reference) throws IOException {
    assert reference != null;
    decRef(reference);
  }

  private void notifyRefreshListenersBefore() throws IOException {
    for (RefreshListener refreshListener : refreshListeners) {
      refreshListener.beforeRefresh();
    }
  }

  private void notifyRefreshListenersRefreshed(boolean didRefresh) throws IOException {
    for (RefreshListener refreshListener : refreshListeners) {
      refreshListener.afterRefresh(didRefresh);
    }
  }

  
  public void addListener(RefreshListener listener) {
    if (listener == null) {
      throw new NullPointerException("Listener must not be null");
    }
    refreshListeners.add(listener);
  }

  
  public void removeListener(RefreshListener listener) {
    if (listener == null) {
      throw new NullPointerException("Listener must not be null");
    }
    refreshListeners.remove(listener);
  }

  
  public interface RefreshListener {

    
    void beforeRefresh() throws IOException;

    
    void afterRefresh(boolean didRefresh) throws IOException;
  }
}

es中flush方法则是调用了shard的syncFlush方法完成commit操作

    @Override
    public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException {
        // best effort attempt before we acquire locks
        ensureOpen();
        if (indexWriter.hasUncommittedChanges()) {
            logger.trace("can't sync commit [{}]. have pending changes", syncId);
            return SyncedFlushResult.PENDING_OPERATIONS;
        }
        if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
            logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
            return SyncedFlushResult.COMMIT_MISMATCH;
        }
        try (ReleasableLock lock = writeLock.acquire()) {
            ensureOpen();
            ensureCanFlush();
            // lets do a refresh to make sure we shrink the version map. This refresh will be either a no-op (just shrink the version map)
            // or we also have uncommitted changes and that causes this syncFlush to fail.
            refresh("sync_flush", SearcherScope.INTERNAL, true);
            if (indexWriter.hasUncommittedChanges()) {
                logger.trace("can't sync commit [{}]. have pending changes", syncId);
                return SyncedFlushResult.PENDING_OPERATIONS;
            }
            if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
                logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
                return SyncedFlushResult.COMMIT_MISMATCH;
            }
            logger.trace("starting sync commit [{}]", syncId);
            commitIndexWriter(indexWriter, translog, syncId);
            logger.debug("successfully sync committed. sync id [{}].", syncId);
            lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
            return SyncedFlushResult.SUCCESS;
        } catch (IOException ex) {
            maybeFailEngine("sync commit", ex);
            throw new EngineException(shardId, "failed to sync commit", ex);
        }
    }

1. Near real-time search | Elasticsearch Guide [7.9] | Elastic

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/286019.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号