栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

elasticsearch源码关于TransportSearchAction【阶段二】

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

elasticsearch源码关于TransportSearchAction【阶段二】

回顾:TransportSearchAction#executeSearch

searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
    Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards).start();
1.AbstractSearchAsyncAction

abstract class AbstractSearchAsyncAction extends InitialSearchPhase
AbstractSearchAsyncAction:方法1

public final void start() {
    executePhase(this);
}

AbstractSearchAsyncAction:方法2
根据不同的SearchPhase 的子类执行,比如SearchQueryThenFetchAsyncAction

  private void executePhase(SearchPhase phase) {
        try {
            phase.run(); 
        } catch (Exception e) {
        }
    }

行父类的父类的方法InitialSearchPhase#run
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
abstract class AbstractSearchAsyncAction extends InitialSearchPhase
abstract class InitialSearchPhase extends SearchPhase

2.InitialSearchPhase

abstract class InitialSearchPhase extends SearchPhase
InitialSearchPhase:方法1

 @Override
    public final void run() throws IOException {
        if (shardsIts.size() > 0) {
            // 取最小分片
            int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
            final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
            assert success;
            for (int index = 0; index < maxConcurrentShardRequests; index++) {
                // 获取分片路由
                final SearchShardIterator shardRoutings = shardsIts.get(index);
                assert shardRoutings.skip() == false;
                if (!Objects.equals(shardRoutings.shardId().getIndexName(), ".kibana")) {
                    System.out.println("【search】InitialSearchPhase ->SearchPhase[抽象类] " +
                        "->CheckedRunnable[接口]#run,执行shard级请求");
                }
                performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
            }
        }
    }

InitialSearchPhase:方法2
关注:onShardResult(result, shardIt)

 private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {

        final Thread thread = Thread.currentThread();
            try {
                executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(),
                    shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
                    @Override
                    public void innerOnResponse(FirstResult result) {
                        if (!Objects.equals(result.getSearchShardTarget().getShardId().getIndexName(),".kibana")){
                            System.out.println("InitialSearchPhase#performPhaseOnShard返回结果 -> 实现接口SearchActionListener#innerOnResponse,");
                        }
                        maybeFork(thread, () -> onShardResult(result, shardIt));
                    }

                    @Override
                    public void onFailure(Exception t) {
                        maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
                    }
                });
            } catch (final Exception e) {
            }
        }
3.SearchQueryThenFetchAsyncAction

final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
转发分片,连接信息,节点信息,分片信息,任务信息

 protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
                                       final SearchActionListener listener) {
        getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
            buildShardSearchRequest(shardIt), getTask(), listener);
    }
4.SearchTransportService

public class SearchTransportService extends AbstractComponent

  //  转发分片,连接信息,节点信息,分片信息,任务信息
    public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
                                 final SearchActionListener listener) {
        // we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
        // this used to be the QUERY_AND_FETCH which doesn't exist anymore
        final boolean fetchDocuments = request.numberOfShards() == 1;
        Supplier supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;

        final ActionListener handler = responseWrapper.apply(connection, listener);//比如 SearchExecutionStatsCollector
        transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
            new ActionListenerResponseHandler<>(handler, supplier));
    }
5.TransportService

public class TransportService extends AbstractLifecycleComponent
TransportService :方法:1

   public  void sendChildRequest(final Transport.Connection connection, final String action,
                                                               final TransportRequest request, final Task parentTask,
                                                               final TransportResponseHandler handler) {
        sendChildRequest(connection, action, request, parentTask, TransportRequestOptions.EMPTY, handler);
    }

TransportService :方法:2

public  void sendChildRequest(final Transport.Connection connection, final String action,
                                                               final TransportRequest request, final Task parentTask,
                                                               final TransportRequestOptions options,
                                                               final TransportResponseHandler handler) {
        request.setParentTask(localNode.getId(), parentTask.getId());
        try {
            sendRequest(connection, action, request, options, handler);
        } catch (TaskCancelledException ex) {
        }

    }

TransportService :方法:3
异步发送请求

 public final  void sendRequest(final Transport.Connection connection, final String action,
                                                                final TransportRequest request,
                                                                final TransportRequestOptions options,
                                                                TransportResponseHandler handler) {

        asyncSender.sendRequest(connection, action, request, options, handler);
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/843169.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号