栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

DFS 阶段做了啥

DFS 阶段做了啥

public class DfsPhase implements SearchPhase {

    @Override
    public void preProcess(SearchContext context) {
    }

    @Override
    public void execute(SearchContext context) {
        try {
            ObjectObjectHashMap fieldStatistics = HppcMaps.newNonullKeysMap();
            Map stats = new HashMap<>();
            IndexSearcher searcher = new IndexSearcher(context.searcher().getIndexReader()) {
                @Override
                public TermStatistics termStatistics(Term term, int docFreq, long totalTermFreq) throws IOException {
                    if (context.isCancelled()) {
                        throw new TaskCancelledException("cancelled");
                    }
                    TermStatistics ts = super.termStatistics(term, docFreq, totalTermFreq);
                    if (ts != null) {
                        stats.put(term, ts);
                    }
                    return ts;
                }

                @Override
                public CollectionStatistics collectionStatistics(String field) throws IOException {
                    if (context.isCancelled()) {
                        throw new TaskCancelledException("cancelled");
                    }
                    CollectionStatistics cs = super.collectionStatistics(field);
                    if (cs != null) {
                        fieldStatistics.put(field, cs);
                    }
                    return cs;
                }
            };

            searcher.createWeight(context.searcher().rewrite(context.query()), ScoreMode.COMPLETE, 1);
            for (RescoreContext rescoreContext : context.rescore()) {
                for (Query query : rescoreContext.getQueries()) {
                    searcher.createWeight(context.searcher().rewrite(query), ScoreMode.COMPLETE, 1);
                }
            }

            Term[] terms = stats.keySet().toArray(new Term[0]);
            TermStatistics[] termStatistics = new TermStatistics[terms.length];
            for (int i = 0; i < terms.length; i++) {
                termStatistics[i] = stats.get(terms[i]);
            }

            context.dfsResult().termsStatistics(terms, termStatistics)
                    .fieldStatistics(fieldStatistics)
                    .maxDoc(context.searcher().getIndexReader().maxDoc());
        } catch (Exception e) {
            throw new DfsPhaseExecutionException(context.shardTarget(), "Exception during dfs phase", e);
        }
    }

}

dfs阶段收集各个分片的词项的统计信息。 收集的方法是利用createWeight时调用的两个方法

  1. termStatistics
  2. collectionStatistics

org.elasticsearch.search.SearchService#executeQueryPhase(org.elasticsearch.search.query.QuerySearchRequest, org.elasticsearch.action.search.SearchShardTask, org.elasticsearch.action.ActionListener)

    public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) {
        runAsync(request.id(), () -> {
            final SearchContext context = findContext(request.id(), request);
            context.setTask(task);
            context.incRef();
            try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
                contextProcessing(context);
                // 将上一轮获取的dfs 放入aggregatedDfs中
                context.searcher().setAggregatedDfs(request.dfs());
                queryPhase.execute(context);
                if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
                    // no hits, we can release the context since there will be no fetch phase
                    freeContext(context.id());
                } else {
                    contextProcessedSuccessfully(context);
                }
                executor.success();
                return context.queryResult();
            } catch (Exception e) {
                logger.trace("Query phase failed", e);
                processFailure(context, e);
                throw e;
            } finally {
                cleanContext(context);
            }
        }, listener);
    }

dfs 分三步

  1. DFS_ACTION_NAME 获取dfs信息
  2. QUERY_ID_ACTION_NAME 将dfs信息传给各个分片,并做query
  3. FETCH_ID_ACTION_NAME fetch
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/674086.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号