栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

elasticsearch源码关于TransportSearchAction【阶段四】

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

elasticsearch源码关于TransportSearchAction【阶段四】

1.回顾InitialSearchPhase

abstract class InitialSearchPhase extends SearchPhase
1.关注SearchActionListener构造函数
2.关注SearchShardTarget构造函数
3.onShardResult(result, shardIt)

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {

        final Thread thread = Thread.currentThread();
            try {
                executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(),
                    shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
                    @Override
                    public void innerOnResponse(FirstResult result) {
                        maybeFork(thread, () -> onShardResult(result, shardIt));
                    }
                });
            } catch (final Exception e) {
            }
        }

入口

2.AbstractRunnable

public abstract class AbstractRunnable implements Runnable

 @Override
    public final void run() {
        try {
//            System.out.println("多线程AbstractRunnable->Runnable[接口]");
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }

AbstractRunnable内部类
in.doRun() ->$SearchService

private class ContextPreservingAbstractRunnable extends AbstractRunnable {
private final AbstractRunnable in;
    @Override
        protected void doRun() throws Exception {
            boolean whileRunning = false;
            threadsOriginalContext = stashContext();
            try {
                creatorsContext.restore();
                whileRunning = true;
                in.doRun();
                whileRunning = false;
            } catch (IllegalStateException ex) {
        }
}
3.SearchService

public class SearchService extends AbstractLifecycleComponent implements IndexEventListener
SearchService 方法 1
listener.onResponse(request)
listener -> $ShardSearchRequest
request -> $ShardSearchTransportRequest

   private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) {
        // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
        // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
        // adding a lot of overhead
        Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis),
            ActionListener.wrap(r ->
                threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() {
                    @Override
                    protected void doRun() throws Exception {
                        listener.onResponse(request);
                    }
                }), listener::onFailure));
    }

SearchService 方法 2
executeQueryPhase(request, task)
request -> $ShardSearchTransportRequest
task -> $SearchTask

 public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListener listener) {
        rewriteShardRequest(request, new ActionListener() {
            @Override
            public void onResponse(ShardSearchRequest request) {
                try {
                    listener.onResponse(executeQueryPhase(request, task));
                } catch (Exception e) {
                }
            }
        });
    }
4.SearchTransportService

public class SearchTransportService extends AbstractComponent
channel.sendResponse(searchPhaseResult);
channel -> RequestHandlerRegistry.$TransportChannelWrapper
searchPhaseResult -> $QuerySearchResult

   transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
            new TaskAwareTransportRequestHandler() {
                @Override
                public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
                    searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener() {
                        @Override
                        public void onResponse(SearchPhaseResult searchPhaseResult) {
                            try {
                                channel.sendResponse(searchPhaseResult);
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        }
                    });
                }
            });
5.RequestHandlerRegistry

public class RequestHandlerRegistry
super.sendResponse(response)
response -> $QuerySearchResult

// 内部类
 private static class TransportChannelWrapper extends DelegatingTransportChannel {
        @Override
        public void sendResponse(TransportResponse response) throws IOException {
            endTask();
            super.sendResponse(response);
        }
    }
6.DelegatingTransportChannel

public class DelegatingTransportChannel implements TransportChannel
channel.sendResponse(response)
channel T r a n s p o r t S e r v i c e TransportService TransportServiceDirectResponseChannel
response$QuerySearchResult

  @Override
    public void sendResponse(TransportResponse response) throws IOException {
        if (response instanceof GetResponse) {
            GetResponse getResponse = (GetResponse) response;
            if (Objects.equals(getResponse.getIndex(), ".kibana")) {
                System.out.println("DelegatingTransportChannel#sendResponse -> (GetResponse) response");
                System.out.println("TransportChannel#sendResponse");
            }
        }
        channel.sendResponse(response);
    }
7.TransportService

public class TransportService extends AbstractLifecycleComponent
TransportService方法:1
sendResponse(response, TransportResponseOptions.EMPTY);
response$QuerySearchResult

 static class DirectResponseChannel implements TransportChannel {
    @Override
        public void sendResponse(TransportResponse response) throws IOException {
            sendResponse(response, TransportResponseOptions.EMPTY);
        }
 }

TransportService方法:2

@Override
        public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException {
            adapter.onResponseSent(requestId, action, response, options);
            final TransportResponseHandler handler = adapter.onResponseReceived(requestId);
            // ignore if its null, the adapter logs it
            if (handler != null) {
                final String executor = handler.executor();
                if (ThreadPool.Names.SAME.equals(executor)) {
                    if (response instanceof QuerySearchResult) {
                        QuerySearchResult querySearchResult = (QuerySearchResult) response;
                        SearchShardTarget searchShardTarget = querySearchResult.getSearchShardTarget();
                        if (!Objects.equals(searchShardTarget.getShardId().getIndexName(),".kibana")){
                            System.out.println("发送响应结果,TransportService#sendResponse ->(QuerySearchResult) response");
                        }
                    }
                    if (response instanceof GetResponse) {
                        GetResponse getResponse = (GetResponse) response;
                        if (!Objects.equals(getResponse.getIndex(),".kibana")){
                            System.out.println("发送响应结果,TransportService#sendResponse ->(GetResponse) response");
                        }
                    }
                    processResponse(handler, response);
                } else {
                    threadPool.executor(executor).execute(() -> processResponse(handler, response));
                }
            }
        }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/848226.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号