栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

elasticsearch源码关于TransportSearchAction【阶段三】

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

elasticsearch源码关于TransportSearchAction【阶段三】

1.回顾.TransportService

public class TransportService extends AbstractLifecycleComponent
TransportService:方法:1

  public final  void sendRequest(final Transport.Connection connection, final String action,
                                                                final TransportRequest request,
                                                                final TransportRequestOptions options,
                                                                TransportResponseHandler handler) {

        asyncSender.sendRequest(connection, action, request, options, handler);
    }

TransportService有参构造触发
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);

TransportService:方法:2

 private  void sendRequestInternal(final Transport.Connection connection, final String action,
                                                                   final TransportRequest request,
                                                                   final TransportRequestOptions options,
                                                TransportResponseHandler handler) {
        // 发现节点                                                   
        DiscoveryNode node = connection.getNode();
        final long requestId = transport.newRequestId();
        final TimeoutHandler timeoutHandler;
        try {
            Supplier storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
            TransportResponseHandler responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
            clientHandlers.put(requestId, new RequestHolder<>(responseHandler, connection, action, timeoutHandler));
            // 发送内部请求
            connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
        } catch (final Exception e) {
          
        }
    }

TransportService:方法:3

  @Override
        public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
            throws IOException, TransportException {
            //内部的RPC请求通过TransportService#sendRequest发送
            //sendRequest会检查目的节点是否是本节点,如果是本节点,
            //则在sendLocalRequest方法中直接通过action 获取对应的Handler,调用对应的处理函数
            //TransportService#sendRequest
            sendLocalRequest(requestId, action, request, options);
        }

TransportService:方法:4

  //ShardSearchTransportRequest,请求本地分片索引
    private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {

        final DirectResponseChannel channel = new DirectResponseChannel(logger, localNode, action, requestId, adapter, threadPool);
        try {
            // sendLocalRequest方法中直接通过action 获取对应的Handler,调用对应的处理函
            final RequestHandlerRegistry reg = adapter.getRequestHandler(action);// 获取action对应的注册信息
            final String executor = reg.getExecutor();// 获取执行器
            if (ThreadPool.Names.SAME.equals(executor)) {
                //noinspection unchecked,#这个方法往下走
                reg.processMessageReceived(request, channel);// 处理task任务
            } 
        } catch (Exception e) {
        
        }
    }
2.RequestHandlerRegistry

public class RequestHandlerRegistry

 public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
        final Task task = taskManager.register(channel.getChannelType(), action, request);// 创建任务
        if (task == null) {
            handler.messageReceived(request, channel);// 启动会走这里
        } else {
            boolean success = false;
            try {
                handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task);
                success = true;
            } finally {
                if (success == false) {
                    taskManager.unregister(task);
                }
            }
        }
    }
3.SearchTransportService

public class SearchTransportService extends AbstractComponent
new TaskAwareTransportRequestHandler

   transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
            new TaskAwareTransportRequestHandler() {
                @Override
                public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
                    searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener() {
                        @Override
                        public void onResponse(SearchPhaseResult searchPhaseResult) {
                            try {
                                channel.sendResponse(searchPhaseResult);
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        }
                    });
                }
            });
4.SearchService

public class SearchService extends AbstractLifecycleComponent implements IndexEventListener

public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListener listener) {
    rewriteShardRequest(request, new ActionListener() {
        @Override
        public void onResponse(ShardSearchRequest request) {
            try {
                // 监听search之后的结果SearchService#SearchService");
                listener.onResponse(executeQueryPhase(request, task));
            } catch (Exception e) {
                onFailure(e);
            }
    });
}
  private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) {
        // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
        // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
        // adding a lot of overhead
        Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis),
            ActionListener.wrap(r ->
                threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() {
                    @Override
                    protected void doRun() throws Exception {
                        listener.onResponse(request);
                    }
                }), listener::onFailure));
    }
5.Rewriteable

public interface Rewriteable

    static > void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener rewriteResponse) {
        rewriteAndFetch(original, context, rewriteResponse, 0);
    }
static > void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener
    rewriteResponse, int iteration) {
    T builder = original;
    try {
        for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder;
             rewrittenBuilder = builder.rewrite(context)) {
            builder = rewrittenBuilder;
            if (context.hasAsyncActions()) {
                T finalBuilder = builder;
                final int currentIterationNumber = iteration;
                context.executeAsyncActions(ActionListener.wrap(n -> rewriteAndFetch(finalBuilder, context, rewriteResponse,
                    currentIterationNumber), rewriteResponse::onFailure));
                return;
            }
        }
        rewriteResponse.onResponse(builder);
    } catch (IOException ex) {
        rewriteResponse.onFailure(ex);
    }
}

rewriteResponse.onResponse(builder);响应到,看第四阶段

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/841330.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号