回顾:TransportSearchAction#executeSearch
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards).start();
1.AbstractSearchAsyncAction
abstract class AbstractSearchAsyncAction extends InitialSearchPhase
AbstractSearchAsyncAction:方法1
public final void start() {
executePhase(this);
}
AbstractSearchAsyncAction:方法2
根据不同的SearchPhase 的子类执行,比如SearchQueryThenFetchAsyncAction
private void executePhase(SearchPhase phase) {
try {
phase.run();
} catch (Exception e) {
}
}
行父类的父类的方法InitialSearchPhase#run
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
abstract class AbstractSearchAsyncAction extends InitialSearchPhase
abstract class InitialSearchPhase extends SearchPhase
abstract class InitialSearchPhase extends SearchPhase
InitialSearchPhase:方法1
@Override
public final void run() throws IOException {
if (shardsIts.size() > 0) {
// 取最小分片
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
for (int index = 0; index < maxConcurrentShardRequests; index++) {
// 获取分片路由
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;
if (!Objects.equals(shardRoutings.shardId().getIndexName(), ".kibana")) {
System.out.println("【search】InitialSearchPhase ->SearchPhase[抽象类] " +
"->CheckedRunnable[接口]#run,执行shard级请求");
}
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
}
}
InitialSearchPhase:方法2
关注:onShardResult(result, shardIt)
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
final Thread thread = Thread.currentThread();
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
if (!Objects.equals(result.getSearchShardTarget().getShardId().getIndexName(),".kibana")){
System.out.println("InitialSearchPhase#performPhaseOnShard返回结果 -> 实现接口SearchActionListener#innerOnResponse,");
}
maybeFork(thread, () -> onShardResult(result, shardIt));
}
@Override
public void onFailure(Exception t) {
maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
}
});
} catch (final Exception e) {
}
}
3.SearchQueryThenFetchAsyncAction
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
转发分片,连接信息,节点信息,分片信息,任务信息
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
final SearchActionListener listener) {
getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}
4.SearchTransportService
public class SearchTransportService extends AbstractComponent
// 转发分片,连接信息,节点信息,分片信息,任务信息
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
final SearchActionListener listener) {
// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
// this used to be the QUERY_AND_FETCH which doesn't exist anymore
final boolean fetchDocuments = request.numberOfShards() == 1;
Supplier supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
final ActionListener handler = responseWrapper.apply(connection, listener);//比如 SearchExecutionStatsCollector
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(handler, supplier));
}
5.TransportService
public class TransportService extends AbstractLifecycleComponent
TransportService :方法:1
publicvoid sendChildRequest(final Transport.Connection connection, final String action, final TransportRequest request, final Task parentTask, final TransportResponseHandler handler) { sendChildRequest(connection, action, request, parentTask, TransportRequestOptions.EMPTY, handler); }
TransportService :方法:2
publicvoid sendChildRequest(final Transport.Connection connection, final String action, final TransportRequest request, final Task parentTask, final TransportRequestOptions options, final TransportResponseHandler handler) { request.setParentTask(localNode.getId(), parentTask.getId()); try { sendRequest(connection, action, request, options, handler); } catch (TaskCancelledException ex) { } }
TransportService :方法:3
异步发送请求
public finalvoid sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { asyncSender.sendRequest(connection, action, request, options, handler); }



