栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ES中refresh

ES中refresh

    private DefaultSearchContext createSearchContext(ShardSearchRequest request, Timevalue timeout,
                                                     boolean assertAsyncActions, String source)
            throws IOException {
        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
        IndexShard indexShard = indexService.getShard(request.shardId().getId());
        SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
                indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
        // 调用acquireSearcher方法获得Searcher对象
        Engine.Searcher searcher = indexShard.acquireSearcher(source);

        boolean success = false;
        DefaultSearchContext searchContext = null;
        try {
            // 获取searchContext。idGenerator.incrementAndGet() 作为作为唯一键来标示此searchContext
            searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
                searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout,
                fetchPhase, clusterService.state().nodes().getMinNodeVersion());
            // we clone the query shard context here just for rewriting otherwise we
            // might end up with incorrect state since we are using now() or script services
            // during rewrite and normalized / evaluate templates etc.
            QueryShardContext context = new QueryShardContext(searchContext.getQueryShardContext());
            Rewriteable.rewrite(request.getRewriteable(), context, assertAsyncActions);
            assert searchContext.getQueryShardContext().isCacheable();
            success = true;
        } finally {
            if (success == false) {
                IOUtils.closeWhileHandlingException(searchContext);
                if (searchContext == null) {
                    // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise
                    // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions).
                    IOUtils.closeWhileHandlingException(searcher);
                }
            }
        }
        return searchContext;
    }

 AsyncRefreshTask 参数是indexService

    final class AsyncRefreshTask extends baseAsyncTask {

        AsyncRefreshTask(IndexService indexService) {
            super(indexService, indexService.getIndexSettings().getRefreshInterval());
        }

        @Override
        protected void runInternal() {
            indexService.maybeRefreshEngine(false);
        }

        @Override
        protected String getThreadPool() {
            return ThreadPool.Names.REFRESH;
        }

        @Override
        public String toString() {
            return "refresh";
        }
    }

    
    public synchronized void rescheduleIfNecessary() {
        if (isClosed()) {
            return;
        }
        if (cancellable != null) {
            cancellable.cancel();
        }
        if (interval.millis() > 0 && mustReschedule()) {
            if (logger.isTraceEnabled()) {
                logger.trace("scheduling {} every {}", toString(), interval);
            }
            cancellable = threadPool.schedule(threadPool.preserveContext(this), interval, getThreadPool());
            isScheduledOrRunning = true;
        } else {
            logger.trace("scheduled {} disabled", toString());
            cancellable = null;
            isScheduledOrRunning = false;
        }
    }

    @Override
    public final void run() {
        synchronized (this) {
            if (isClosed()) {
                return;
            }
            cancellable = null;
            isScheduledOrRunning = autoReschedule;
        }
        try {
            runInternal();
        } catch (Exception ex) {
            if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
                // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
                logger.warn(
                    () -> new ParameterizedMessage(
                        "failed to run task {} - suppressing re-occurring exceptions unless the exception changes",
                        toString()),
                    ex);
                lastThrownException = ex;
            }
        } finally {
            if (autoReschedule) {
                rescheduleIfNecessary();
            }
        }
    }

参数为false的情况

        @Override
        protected void runInternal() {
            indexService.maybeRefreshEngine(false);
        }

 参数为true的情况

    @Override
    public synchronized void updatemetaData(final IndexmetaData currentIndexmetaData, final IndexmetaData newIndexmetaData) {
        final boolean updateIndexSettings = indexSettings.updateIndexmetaData(newIndexmetaData);

        if (Assertions.ENABLED
                && currentIndexmetaData != null
                && currentIndexmetaData.getCreationVersion().onOrAfter(Version.V_6_5_0)) {
            final long currentSettingsVersion = currentIndexmetaData.getSettingsVersion();
            final long newSettingsVersion = newIndexmetaData.getSettingsVersion();
            if (currentSettingsVersion == newSettingsVersion) {
                assert updateIndexSettings == false;
            } else {
                assert updateIndexSettings;
                assert currentSettingsVersion < newSettingsVersion :
                        "expected current settings version [" + currentSettingsVersion + "] "
                                + "to be less than new settings version [" + newSettingsVersion + "]";
            }
        }

        if (updateIndexSettings) {
            for (final IndexShard shard : this.shards.values()) {
                try {
                    shard.onSettingsChanged();
                } catch (Exception e) {
                    logger.warn(
                        () -> new ParameterizedMessage(
                            "[{}] failed to notify shard about setting change", shard.shardId().id()), e);
                }
            }
            if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
                // once we change the refresh interval we schedule yet another refresh
                // to ensure we are in a clean and predictable state.
                // it doesn't matter if we move from or to -1  in both cases we want
                // docs to become visible immediately. This also flushes all pending indexing / search requests
                // that are waiting for a refresh.
                threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
                    @Override
                    public void onFailure(Exception e) {
                        logger.warn("forced refresh failed after interval change", e);
                    }

                    @Override
                    protected void doRun() throws Exception {
                        maybeRefreshEngine(true);
                    }

                    @Override
                    public boolean isForceExecution() {
                        return true;
                    }
                });
                rescheduleRefreshTasks();
            }
            updateFsyncTaskIfNecessary();
        }

        metaDataListeners.forEach(c -> c.accept(newIndexmetaData));
    }
    private void maybeRefreshEngine(boolean force) {
        if (indexSettings.getRefreshInterval().millis() > 0 || force) {
            for (IndexShard shard : this.shards.values()) {
                try {
                    shard.scheduledRefresh();
                } catch (IndexShardClosedException | AlreadyClosedException ex) {
                    // fine - continue;
                }
            }
        }
    }

    
    public boolean scheduledRefresh() {
        verifyNotClosed();
        boolean listenerNeedsRefresh = refreshListeners.refreshNeeded();
        if (isReadAllowed() && (listenerNeedsRefresh || getEngine().refreshNeeded())) {
            if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
                && isSearchIdle()
                && indexSettings.isExplicitRefresh() == false
                && active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
                // lets skip this refresh since we are search idle and
                // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
                // cause the next schedule to refresh.
                final Engine engine = getEngine();
                engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
                setRefreshPending(engine);
                return false;
            } else {
                if (logger.isTraceEnabled()) {
                    logger.trace("refresh with source [schedule]");
                }
                return getEngine().maybeRefresh("schedule");
            }
        }
        final Engine engine = getEngine();
        engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
        return false;
    }

具体refresh方式

org.elasticsearch.index.engine.ElasticsearchReaderManager

    @Override
    protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
        final ElasticsearchDirectoryReader reader = (ElasticsearchDirectoryReader) 
                // 调用DirectoryReader方法进行refresh
DirectoryReader.openIfChanged(referenceToRefresh);
        if (reader != null) {
            // 统计一下内存占用情况
            refreshListener.accept(reader, referenceToRefresh);
        }
        return reader;
    }

    
  public static DirectoryReader openIfChanged(DirectoryReader oldReader) throws IOException {
    //调用oldReader的doOpenIfChanged方法
    final DirectoryReader newReader = oldReader.doOpenIfChanged();
    assert newReader != oldReader;
    return newReader;
  }
  @Override
  protected DirectoryReader doOpenIfChanged() throws IOException {
    // indexCommit设置为null
    return doOpenIfChanged((IndexCommit) null);
  }
  @Override
  protected DirectoryReader doOpenIfChanged(final IndexCommit commit) throws IOException {
    ensureOpen();

    // If we were obtained by writer.getReader(), re-ask the
    // writer to get a new reader.
    // indexWrite应不为空
    if (writer != null) {
      return doOpenFromWriter(commit);
    } else {
      return doOpenNoWriter(commit);
    }
  }
  private DirectoryReader doOpenFromWriter(IndexCommit commit) throws IOException {
    if (commit != null) {
      return doOpenFromCommit(commit);
    }

    if (writer.nrtIsCurrent(segmentInfos)) {
      return null;
    }

    // 会在这一步进行flushAllThreads操作 将内存中的数据生成segment
    DirectoryReader reader = writer.getReader(applyAllDeletes, writeAllDeletes);

    // If in fact no changes took place, return null:
    if (reader.getVersion() == segmentInfos.getVersion()) {
      reader.decRef();
      return null;
    }

    return reader;
  }

另:

org.apache.lucene.index.StandardDirectoryReader

reader 用来判断是否没有发生变更

  @Override
  public boolean isCurrent() throws IOException {
    ensureOpen();
    if (writer == null || writer.isClosed()) {
      // Fully read the segments file: this ensures that it's
      // completely written so that if
      // IndexWriter.prepareCommit has been called (but not
      // yet commit), then the reader will still see itself as
      // current:
      SegmentInfos sis = SegmentInfos.readLatestCommit(directory);

      // we loaded SegmentInfos from the directory
      return sis.getVersion() == segmentInfos.getVersion();
    } else {
      return writer.nrtIsCurrent(segmentInfos);
    }
  }
  synchronized boolean nrtIsCurrent(SegmentInfos infos) {
    ensureOpen();
    boolean isCurrent = infos.getVersion() == segmentInfos.getVersion()
      && docWriter.anyChanges() == false
      && bufferedUpdatesStream.any() == false
      && readerPool.anyDocValuesChanges() == false;
    if (infoStream.isEnabled("IW")) {
      if (isCurrent == false) {
        infoStream.message("IW", "nrtIsCurrent: infoVersion matches: " + (infos.getVersion() == segmentInfos.getVersion()) + "; DW changes: " + docWriter.anyChanges() + "; BD changes: "+ bufferedUpdatesStream.any());
      }
    }
    return isCurrent;
  }

 

参考:

近实时搜索NRT(三)-html

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/423254.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号