public class DfsPhase implements SearchPhase {
@Override
public void preProcess(SearchContext context) {
}
@Override
public void execute(SearchContext context) {
try {
ObjectObjectHashMap fieldStatistics = HppcMaps.newNonullKeysMap();
Map stats = new HashMap<>();
IndexSearcher searcher = new IndexSearcher(context.searcher().getIndexReader()) {
@Override
public TermStatistics termStatistics(Term term, int docFreq, long totalTermFreq) throws IOException {
if (context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
TermStatistics ts = super.termStatistics(term, docFreq, totalTermFreq);
if (ts != null) {
stats.put(term, ts);
}
return ts;
}
@Override
public CollectionStatistics collectionStatistics(String field) throws IOException {
if (context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
CollectionStatistics cs = super.collectionStatistics(field);
if (cs != null) {
fieldStatistics.put(field, cs);
}
return cs;
}
};
searcher.createWeight(context.searcher().rewrite(context.query()), ScoreMode.COMPLETE, 1);
for (RescoreContext rescoreContext : context.rescore()) {
for (Query query : rescoreContext.getQueries()) {
searcher.createWeight(context.searcher().rewrite(query), ScoreMode.COMPLETE, 1);
}
}
Term[] terms = stats.keySet().toArray(new Term[0]);
TermStatistics[] termStatistics = new TermStatistics[terms.length];
for (int i = 0; i < terms.length; i++) {
termStatistics[i] = stats.get(terms[i]);
}
context.dfsResult().termsStatistics(terms, termStatistics)
.fieldStatistics(fieldStatistics)
.maxDoc(context.searcher().getIndexReader().maxDoc());
} catch (Exception e) {
throw new DfsPhaseExecutionException(context.shardTarget(), "Exception during dfs phase", e);
}
}
}
dfs阶段收集各个分片的词项的统计信息。 收集的方法是利用createWeight时调用的两个方法
- termStatistics
- collectionStatistics
org.elasticsearch.search.SearchService#executeQueryPhase(org.elasticsearch.search.query.QuerySearchRequest, org.elasticsearch.action.search.SearchShardTask, org.elasticsearch.action.ActionListener
public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListenerlistener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); context.setTask(task); context.incRef(); try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { contextProcessing(context); // 将上一轮获取的dfs 放入aggregatedDfs中 context.searcher().setAggregatedDfs(request.dfs()); queryPhase.execute(context); if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { // no hits, we can release the context since there will be no fetch phase freeContext(context.id()); } else { contextProcessedSuccessfully(context); } executor.success(); return context.queryResult(); } catch (Exception e) { logger.trace("Query phase failed", e); processFailure(context, e); throw e; } finally { cleanContext(context); } }, listener); }
dfs 分三步
-
DFS_ACTION_NAME 获取dfs信息
-
QUERY_ID_ACTION_NAME 将dfs信息传给各个分片,并做query
-
FETCH_ID_ACTION_NAME fetch



