abstract class InitialSearchPhase extends SearchPhase
1.关注SearchActionListener构造函数
2.关注SearchShardTarget构造函数
3.onShardResult(result, shardIt)
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
final Thread thread = Thread.currentThread();
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
maybeFork(thread, () -> onShardResult(result, shardIt));
}
});
} catch (final Exception e) {
}
}
入口
2.AbstractRunnablepublic abstract class AbstractRunnable implements Runnable
@Override
public final void run() {
try {
// System.out.println("多线程AbstractRunnable->Runnable[接口]");
doRun();
} catch (Exception t) {
onFailure(t);
} finally {
onAfter();
}
}
AbstractRunnable内部类
in.doRun() ->$SearchService
private class ContextPreservingAbstractRunnable extends AbstractRunnable {
private final AbstractRunnable in;
@Override
protected void doRun() throws Exception {
boolean whileRunning = false;
threadsOriginalContext = stashContext();
try {
creatorsContext.restore();
whileRunning = true;
in.doRun();
whileRunning = false;
} catch (IllegalStateException ex) {
}
}
3.SearchService
public class SearchService extends AbstractLifecycleComponent implements IndexEventListener
SearchService 方法 1
listener.onResponse(request)
listener -> $ShardSearchRequest
request -> $ShardSearchTransportRequest
private void rewriteShardRequest(ShardSearchRequest request, ActionListenerlistener) { // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), ActionListener.wrap(r -> threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() { @Override protected void doRun() throws Exception { listener.onResponse(request); } }), listener::onFailure)); }
SearchService 方法 2
executeQueryPhase(request, task)
request -> $ShardSearchTransportRequest
task -> $SearchTask
public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListener4.SearchTransportServicelistener) { rewriteShardRequest(request, new ActionListener () { @Override public void onResponse(ShardSearchRequest request) { try { listener.onResponse(executeQueryPhase(request, task)); } catch (Exception e) { } } }); }
public class SearchTransportService extends AbstractComponent
channel.sendResponse(searchPhaseResult);
channel -> RequestHandlerRegistry.$TransportChannelWrapper
searchPhaseResult -> $QuerySearchResult
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
});
}
});
5.RequestHandlerRegistry
public class RequestHandlerRegistry
super.sendResponse(response)
response -> $QuerySearchResult
// 内部类
private static class TransportChannelWrapper extends DelegatingTransportChannel {
@Override
public void sendResponse(TransportResponse response) throws IOException {
endTask();
super.sendResponse(response);
}
}
6.DelegatingTransportChannel
public class DelegatingTransportChannel implements TransportChannel
channel.sendResponse(response)
channel
T
r
a
n
s
p
o
r
t
S
e
r
v
i
c
e
TransportService
TransportServiceDirectResponseChannel
response$QuerySearchResult
@Override
public void sendResponse(TransportResponse response) throws IOException {
if (response instanceof GetResponse) {
GetResponse getResponse = (GetResponse) response;
if (Objects.equals(getResponse.getIndex(), ".kibana")) {
System.out.println("DelegatingTransportChannel#sendResponse -> (GetResponse) response");
System.out.println("TransportChannel#sendResponse");
}
}
channel.sendResponse(response);
}
7.TransportService
public class TransportService extends AbstractLifecycleComponent
TransportService方法:1
sendResponse(response, TransportResponseOptions.EMPTY);
response$QuerySearchResult
static class DirectResponseChannel implements TransportChannel {
@Override
public void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY);
}
}
TransportService方法:2
@Override
public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException {
adapter.onResponseSent(requestId, action, response, options);
final TransportResponseHandler handler = adapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
final String executor = handler.executor();
if (ThreadPool.Names.SAME.equals(executor)) {
if (response instanceof QuerySearchResult) {
QuerySearchResult querySearchResult = (QuerySearchResult) response;
SearchShardTarget searchShardTarget = querySearchResult.getSearchShardTarget();
if (!Objects.equals(searchShardTarget.getShardId().getIndexName(),".kibana")){
System.out.println("发送响应结果,TransportService#sendResponse ->(QuerySearchResult) response");
}
}
if (response instanceof GetResponse) {
GetResponse getResponse = (GetResponse) response;
if (!Objects.equals(getResponse.getIndex(),".kibana")){
System.out.println("发送响应结果,TransportService#sendResponse ->(GetResponse) response");
}
}
processResponse(handler, response);
} else {
threadPool.executor(executor).execute(() -> processResponse(handler, response));
}
}
}



