public class TransportService extends AbstractLifecycleComponent
TransportService:方法:1
public finalvoid sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { asyncSender.sendRequest(connection, action, request, options, handler); }
TransportService有参构造触发
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
TransportService:方法:2
privatevoid sendRequestInternal(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { // 发现节点 DiscoveryNode node = connection.getNode(); final long requestId = transport.newRequestId(); final TimeoutHandler timeoutHandler; try { Supplier storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true); TransportResponseHandler responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler); clientHandlers.put(requestId, new RequestHolder<>(responseHandler, connection, action, timeoutHandler)); // 发送内部请求 connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream } catch (final Exception e) { } }
TransportService:方法:3
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
//内部的RPC请求通过TransportService#sendRequest发送
//sendRequest会检查目的节点是否是本节点,如果是本节点,
//则在sendLocalRequest方法中直接通过action 获取对应的Handler,调用对应的处理函数
//TransportService#sendRequest
sendLocalRequest(requestId, action, request, options);
}
TransportService:方法:4
//ShardSearchTransportRequest,请求本地分片索引
private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
final DirectResponseChannel channel = new DirectResponseChannel(logger, localNode, action, requestId, adapter, threadPool);
try {
// sendLocalRequest方法中直接通过action 获取对应的Handler,调用对应的处理函
final RequestHandlerRegistry reg = adapter.getRequestHandler(action);// 获取action对应的注册信息
final String executor = reg.getExecutor();// 获取执行器
if (ThreadPool.Names.SAME.equals(executor)) {
//noinspection unchecked,#这个方法往下走
reg.processMessageReceived(request, channel);// 处理task任务
}
} catch (Exception e) {
}
}
2.RequestHandlerRegistry
public class RequestHandlerRegistry
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
final Task task = taskManager.register(channel.getChannelType(), action, request);// 创建任务
if (task == null) {
handler.messageReceived(request, channel);// 启动会走这里
} else {
boolean success = false;
try {
handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
taskManager.unregister(task);
}
}
}
}
3.SearchTransportService
public class SearchTransportService extends AbstractComponent
new TaskAwareTransportRequestHandler
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
});
}
});
4.SearchService
public class SearchService extends AbstractLifecycleComponent implements IndexEventListener
public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListenerlistener) { rewriteShardRequest(request, new ActionListener () { @Override public void onResponse(ShardSearchRequest request) { try { // 监听search之后的结果SearchService#SearchService"); listener.onResponse(executeQueryPhase(request, task)); } catch (Exception e) { onFailure(e); } }); }
private void rewriteShardRequest(ShardSearchRequest request, ActionListener5.Rewriteablelistener) { // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), ActionListener.wrap(r -> threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() { @Override protected void doRun() throws Exception { listener.onResponse(request); } }), listener::onFailure)); }
public interface Rewriteable
static > void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener rewriteResponse) {
rewriteAndFetch(original, context, rewriteResponse, 0);
}
static> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener rewriteResponse, int iteration) { T builder = original; try { for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder; rewrittenBuilder = builder.rewrite(context)) { builder = rewrittenBuilder; if (context.hasAsyncActions()) { T finalBuilder = builder; final int currentIterationNumber = iteration; context.executeAsyncActions(ActionListener.wrap(n -> rewriteAndFetch(finalBuilder, context, rewriteResponse, currentIterationNumber), rewriteResponse::onFailure)); return; } } rewriteResponse.onResponse(builder); } catch (IOException ex) { rewriteResponse.onFailure(ex); } }
rewriteResponse.onResponse(builder);响应到,看第四阶段



