栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【elasticsearch】elasticsearch源码分析二:bulk请求写入流程

【elasticsearch】elasticsearch源码分析二:bulk请求写入流程

------------------------------文档开始--------------------------------------------------

启动远程调试模式:./gradlew run --debug-jvm

1.数据准备

changjing@changjingdeMacBook-Air test % cat a.json
{ “index” : { “_index” : “test”, “_id” : “1” } }
{ “field1” : “value1” }
{ “delete” : { “_index” : “test”, “_id” : “2” } }
{ “create” : { “_index” : “test”, “_id” : “3” } }
{ “field1” : “value3” }
{ “update” : {"_id" : “1”, “_index” : “test”} }
{ “doc” : {“field2” : “value2”} }

curl -u elastic:password -XPOST http://localhost:9200/_bulk --data-binary @a.json -H ‘Content-Type: application/json’


构造一个 shardID

1)由于自定义 routing 可能影响文档分布的均匀性,因此引入了一个 partitionOffset 参数。
2)当使用自定义 routing 时,将 id 的 hash 值对配置中的 routing_partition_size 取模作为 partitionOffset。使用默认 routing 时 partitionOffset 值为 0
3)shardID 计算公式 ((hash(id 或 routing 参数) + partitionOffset) % number_of_routing_shards) / (number_of_routing_shards/number_of_shards),number_of_shards 是配置文件中设置的总的 shard 的数量,number_of_routing_shards 也可以进行配置,默认与 number_of_shards 相同
4)Hash 函数使用 Murmur3,Murmur3 是非加密散列函数,不适用于加密目的

action.support.replication.ReplicationOperation::handlePrimaryResult

遵循微软的 PacificA协议,存在长尾,即需等待响应最慢的shard返回
1)写primary shard
2)写完primary后,重新获取GlobalCheckpoint、maxSeqNoOfUpdatesOrDeletes、replicationGroup,确保值最新可用。标记无法接收本次请求的分片,被标记的分片在主分片挂掉后不会被选为新的主分片,避免数据不一致
3)遍历复制组内shard,写副本shard,若写入失败,primary认为该replica shard发生故障不可用,将会向master汇报并移除该replica。写入成功更新本地检查点。
4)等待复制组内所有shard写入完成后,更新本地检查点及全局检查点,返回请求结果

index.engine. InternalEngine:: index
1)基于本地timestamp自动生成ID,利用该id使用adddocument来替代updatedocument。
2)调用indexIntoLucene将数据写入lucene, 更新文档的sequence number和primary term,最终调用lucene的IndexWriter写入数据
3)写translog。如果有文档失败,将生成的seq_no在translog and Lucene中记录为空操作,即no-op

2.bulk请求分发

org.elasticsearch.http.AbstractHttpServerTransport::handleIncomingRequest—>> public org.elasticsearch.rest.RestController::dispatchRequest(public)—>>::tryAllHandlers—>>handlers::retrieveAll—>>::dispatchRequest(private)—>>
///org.elasticsearch.rest.action.document::RestChannelConsumer–>>>///org.elasticsearch.action.support.TransportAction—>::execute—>::proceed///org.elasticsearch.action.bulk.TransportBulkAction::doExecute

AbstractHttpServerTransport接收请求

org.elasticsearch.http.AbstractHttpServerTransport

private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {
       Exception badRequestCause = exception;
       //从来自 Netty 的传入请求创建一个 REST 请求。但是,如果存在错误编码的参数或 Content-Type 标头无效,则创建此请求可能会失败。如果发生这些特定故障之一,我们会尝试在没有导致异常的输入的情况下再次创建 REST 请求(例如,我们删除 Content-Type 标头,或跳过对参数的解码)。一旦我们有一个请求,我们就会将该请求作为一个错误请求发送,并带有导致我们将该请求视为错误的底层异常

       
       final RestRequest restRequest;
       {
           RestRequest innerRestRequest;
           try {
               innerRestRequest = RestRequest.request(xContentRegistry, httpRequest, httpChannel);
           } catch (final RestRequest.ContentTypeHeaderException e) {
               badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
               innerRestRequest = requestWithoutContentTypeHeader(httpRequest, httpChannel, badRequestCause);
           } catch (final RestRequest.BadParameterException e) {
               badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
               innerRestRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
           }
           restRequest = innerRestRequest;
       }

       final HttpTracer trace = tracer.maybeTraceRequest(restRequest, exception);
       //创建一个用于发送响应的channel。但是,如果任何 filter_path、human 或 pretty 参数的参数值无效,则创建此通道可能会失败。我们通过通道构造函数中的 IllegalArgumentException 检测到这些特定故障,然后尝试创建一个绕过解析这些参数值的新channel

       
       final RestChannel channel;
       {
           RestChannel innerChannel;
           ThreadContext threadContext = threadPool.getThreadContext();
           try {
               innerChannel =
                   new DefaultRestChannel(httpChannel, httpRequest, restRequest, bigArrays, handlingSettings, threadContext, trace);
           } catch (final IllegalArgumentException e) {
               badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
               final RestRequest innerRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
               innerChannel =
                   new DefaultRestChannel(httpChannel, httpRequest, innerRequest, bigArrays, handlingSettings, threadContext, trace);
           }
           channel = innerChannel;
       }

       dispatchRequest(restRequest, channel, badRequestCause);  //继续----》
   }

RestController执行调度请求,public dispatchRequest会根据request找到其对应的handler,private dispatchRequest中会调用handler的handleRequest方法处理请求。

org.elasticsearch.rest.RestController

    public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
        if (request.rawPath().equals("/favicon.ico")) {
            handleFavicon(request.method(), request.uri(), channel);
            return;
        }
        try {
            tryAllHandlers(request, channel, threadContext);   //实际执行
        } catch (Exception e) {
            try {
                channel.sendResponse(new BytesRestResponse(channel, e));
            } catch (Exception inner) {
                inner.addSuppressed(e);
                logger.error(() ->
                    new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
            }
        }
    }

private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
       //解析restheader,存入threadContext
        for (final RestHeaderDefinition restHeader : headersToCopy) {
            final String name = restHeader.getName();
            final List headerValues = request.getAllHeaderValues(name);
            if (headerValues != null && headerValues.isEmpty() == false) {
                final List distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
                if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
                    channel.sendResponse(
                        BytesRestResponse.
                            createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "]."));
                    return;
                } else {
                    threadContext.putHeader(name, String.join(",", distinctHeaderValues));
                }
            }
        }
        // error_trace cannot be used when we disable detailed errors
        // we consume the error_trace parameter first to ensure that it is always consumed
        if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
            channel.sendResponse(
                    BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));
            return;
        }
        
        final String rawPath = request.rawPath();  //rawPath:"/_bulk"
        final String uri = request.uri();          //uri:"/_bulk"
        final RestRequest.Method requestMethod;    //requestMethod:POST
        try {
            //解析 HTTP 方法,如果方法无效则失败
            // Resolves the HTTP method and fails if the method is invalid
            requestMethod = request.method();
            //根据rawPath,遍历所有可能的handlers,尝试处理请求
            // Loop through all possible handlers, attempting to dispatch the request
            Iterator allHandlers = getAllHandlers(request.params(), rawPath);
            while (allHandlers.hasNext()) {
                final RestHandler handler;
                final MethodHandlers handlers = allHandlers.next();
                if (handlers == null) {
                    handler = null;
                } else {
                    handler = handlers.getHandler(requestMethod);
                }
                if (handler == null) {
                  if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
                      return;
                  }
                } else {
                    dispatchRequest(request, channel, handler);   //---->实际执行,继续
                    return;
                }
            }
        } catch (final IllegalArgumentException e) {
            handleUnsupportedHttpMethod(uri, null, channel, getValidHandlerMethodSet(rawPath), e);
            return;
        }
        // If request has not been handled, fallback to a bad request error.
        handleBadRequest(uri, requestMethod, channel);
    }

提前注册:ES在启动时会通过RestController的registerHandler方法,提前把所有需要使用的handler注册到对应的hander列表中,包括http请求方法(GET、PUT、POST、DELETE等)的handlers列表。当用户请求到达时,就可以通过RestController的getAllHandlers方法,并根据http请求方法和路径取出对应的handler

    public void registerHandler(final RestHandler restHandler) {
        restHandler.routes().forEach(route -> registerHandler(route.getMethod(), route.getPath(), restHandler));
        restHandler.deprecatedRoutes().forEach(route ->
            registerAsDeprecatedHandler(route.getMethod(), route.getPath(), restHandler, route.getDeprecationMessage()));
        restHandler.replacedRoutes().forEach(route -> registerWithDeprecatedHandler(route.getMethod(), route.getPath(),
            restHandler, route.getDeprecatedMethod(), route.getDeprecatedPath()));
    }

    
    protected void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
        if (handler instanceof baseRestHandler) {
            usageService.addRestHandler((baseRestHandler) handler);
        }
        final RestHandler maybeWrappedHandler = handlerWrapper.apply(handler);
        handlers.insertOrUpdate(path, new MethodHandlers(path, maybeWrappedHandler, method),
            (mHandlers, newMHandler) -> mHandlers.addMethods(maybeWrappedHandler, method));
    }

Iterator getAllHandlers(@Nullable Map requestParamsRef, String rawPath) {
        final Supplier> paramsSupplier;
        if (requestParamsRef == null) {
            paramsSupplier = () -> null;
        } else {
            //在检索到正确的路径之间,我们需要重新设置参数,否则参数会从 URI 中解析出来,而实际上并没有处理。
            // Between retrieving the correct path, we need to reset the parameters,
            // otherwise parameters are parsed out of the URI that aren't actually handled.
            final Map originalParams = new HashMap<>(requestParamsRef);
            paramsSupplier = () -> {
                //PathTrie 修改请求,因此在每次迭代时重置参数
                // PathTrie modifies the request, so reset the params between each iteration
                requestParamsRef.clear();
                requestParamsRef.putAll(originalParams);
                return requestParamsRef;
            };
        }
        // 使用原始路径,因为不想在处理path resolution进行编码
        // we use rawPath since we don't want to decode it while processing the path resolution
        // so we can handle things like:
        // my_index/my_type/http%3A%2F%2Fwww.google.com
        return handlers.retrieveAll(rawPath, paramsSupplier);
    }
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
        final int contentLength = request.content().length();
        if (contentLength > 0) {
            final XContentType xContentType = request.getXContentType();
         //此处省略若干行.....
        }
        RestChannel responseChannel = channel;
        try {
            if (handler.canTripCircuitBreaker()) {
                inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "");
            } else {
                inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
            }
            // 为请求保留字节,也需要通过channel发送response
            //iff we could reserve bytes for the request we need to send the response also over this channel
            responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
            //如果需要复制,则在断路器中将请求计数加倍?
            // TODO: Count requests double in the circuit breaker if they need copying?
            if (handler.allowsUnsafeBuffers() == false) {
                request.ensureSafeBuffers();
            }
            handler.handleRequest(request, responseChannel, client);  //---->继续
        } catch (Exception e) {
            responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
        }
    }

对于bulk操作,其请求对应的handler是RestBulkAction,es进程启动时注册
org.elasticsearch.rest.action.document.RestBulkAction

    public List routes() {
        return unmodifiableList(asList(
            new Route(POST, "/_bulk"),
            new Route(PUT, "/_bulk"),
            new Route(POST, "/{index}/_bulk"),
            new Route(PUT, "/{index}/_bulk"),
            // Deprecated typed endpoints.
            new Route(POST, "/{index}/{type}/_bulk"),
            new Route(PUT, "/{index}/{type}/_bulk")));
    }

RestBulkAction会将RestRequest解析并转化为BulkRequest,然后再对BulkRequest做处理,这块的逻辑在prepareRequest方法中

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        BulkRequest bulkRequest = Requests.bulkRequest();
        String defaultIndex = request.param("index");
        String defaultType = request.param("type");
        if (defaultType == null) {
            defaultType = MapperService.SINGLE_MAPPING_NAME;
        } else {
            deprecationLogger.deprecatedAndMaybeLog("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE);
        }
        String defaultRouting = request.param("routing");
        FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
        String defaultPipeline = request.param("pipeline");
        String waitForActiveShards = request.param("wait_for_active_shards");
        if (waitForActiveShards != null) {
            bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
        }
        bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
        bulkRequest.setRefreshPolicy(request.param("refresh"));
        bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting,
            defaultFetchSourceContext, defaultPipeline, allowExplicitIndex, request.getXContentType());

        return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));  //----继续处理
    }


    public void bulk(final BulkRequest request, final ActionListener listener) {
        execute(BulkAction.INSTANCE, request, listener);
    }

    public <    Request extends ActionRequest,
                Response extends ActionResponse
            > Task executeLocally(ActionType action, Request request, ActionListener listener) {
        return transportAction(action).execute(request, listener);
    }

        
    public final void execute(Task task, Request request, ActionListener listener) {
        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }

        if (task != null && request.getShouldStoreResult()) {
            listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
        }

        RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
        requestFilterChain.proceed(task, actionName, request, listener);
    }

TransportAction会调用一个请求过滤链来处理请求,如果相关的插件定义了对该action的过滤处理,则先会执行插件的处理逻辑,然后再进入TransportAction的处理逻辑,过滤链的处理逻辑如下
org.elasticsearch.action.support.TransportAction

public void proceed(Task task, String actionName, Request request, ActionListener listener) {
            int i = index.getAndIncrement();
            try {
                if (i < this.action.filters.length) {
                    this.action.filters[i].apply(task, actionName, request, listener, this);
                } else if (i == this.action.filters.length) {
                    this.action.doExecute(task, request, listener); // 执行TransportAction的处理逻辑
                } else {
                    listener.onFailure(new IllegalStateException("proceed was called too many times"));
                }
            } catch(Exception e) {
                logger.trace("Error during transport action execution.", e);
                listener.onFailure(e);
            }
        }

3.写入步骤

org.elasticsearch.action.bulk.TransportBulkAction.doExecute—>

对于Bulk请求,具体的执行对象是TransportBulkAction的实例,Rest层转化为Transport层的处理. TransportBulkAction的doExecute处理逻辑如下

org.elasticsearch.action.bulk.TransportBulkAction::doExecute—》〉》::executeBulk—>TransportBulkAction.BulkOperation::doRun—》〉org.elasticsearch.action.support.TransportAction:execute—》〉TransportShardBulkAction:

protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) {
        final long startTime = relativeTime();
        final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size());

        boolean hasIndexRequestsWithPipelines = false;
        final metaData metaData = clusterService.state().getmetaData();
        final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
        for (DocWriteRequest actionRequest : bulkRequest.requests) {
            IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
            if (indexRequest != null) {
                //解析请求是否存在Pipeline,每个index请求都需要评估,因为这个方法同时修改了IndexRequest
                // Each index request needs to be evaluated, because this method also modifies the IndexRequest
                boolean indexRequestHasPipeline = resolvePipelines(actionRequest, indexRequest, metaData);
                hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
            }

            if (actionRequest instanceof IndexRequest) {
                IndexRequest ir = (IndexRequest) actionRequest;
                ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
                if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
                    throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
                }
            }
        }

        if (hasIndexRequestsWithPipelines) {  //pipeline处理逻辑,没有pipeline直接跳过
            // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
            // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
            // this path is never taken.
            try {
                if (Assertions.ENABLED) {
                    final boolean arePipelinesResolved = bulkRequest.requests()
                        .stream()
                        .map(TransportBulkAction::getIndexWriteRequest)
                        .filter(Objects::nonNull)
                        .allMatch(IndexRequest::isPipelineResolved);
                    assert arePipelinesResolved : bulkRequest;
                }
                if (clusterService.localNode().isIngestNode()) {
                    processBulkIndexIngestRequest(task, bulkRequest, listener);
                } else {
                    ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
                }
            } catch (Exception e) {
                listener.onFailure(e);
            }
            return;
        }

        if (needToCheck()) {
            //配置中设定了检查请求涉及的 index,并自动创建缺失的index
            // Attempt to create all the indices that we're going to need during the bulk before we start.
            //Step 1: 收集请求中涉及的 index
            // Step 1: collect all the indices in the request
            final Set indices = bulkRequest.requests.stream()
                    // delete requests should not attempt to create the index (if the index does not
                    // exists), unless an external versioning is used
                .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
                        || request.versionType() == VersionType.EXTERNAL
                        || request.versionType() == VersionType.EXTERNAL_GTE)
                .map(DocWriteRequest::index)
                .collect(Collectors.toSet());
             //筛选出可以自动创建的 index ,同时创建不能自动创建的index列表。(由于要创建的 index 已存在或者名字被其他 index 的 alias 占用或者配置不支持等原因而不能创建)   
            
            final Map indicesThatCannotBeCreated = new HashMap<>();
            Set autoCreateIndices = new HashSet<>();
            ClusterState state = clusterService.state();
            for (String index : indices) {
                boolean shouldAutoCreate;
                try {
                    shouldAutoCreate = shouldAutoCreate(index, state);
                } catch (IndexNotFoundException e) {
                    shouldAutoCreate = false;
                    // 不可创建的 index 存放在 indicesThatCannotBeCreated,这是个 HashMap,key 是 index 的名字,value 是不可创建的原因
                    indicesThatCannotBeCreated.put(index, e);
                }
                if (shouldAutoCreate) {
                    // 需要自动创建的 index 放在 autoCreateIndices
                    autoCreateIndices.add(index);
                }
            }
            //Step 3: 创建需要自动创建的 index,创建完成后再继续执行请求的操作
            // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
            if (autoCreateIndices.isEmpty()) {
                 无需创建 index,直接执行请求的操作
                executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
            } else {
                // 这里使用了一个原子性的计数器,用于在多线程执行的环境下记录已成功创建的 index 的数量
                final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
                for (String index : autoCreateIndices) {
                    // 执行创建 index
                    createIndex(index, bulkRequest.timeout(), new ActionListener() {
                        @Override
                        public void onResponse(CreateIndexResponse result) {
                            if (counter.decrementAndGet() == 0) {
                             // 成功创建 index,当计数器为 0,即所有 index 都已创建完成后执行请求的任务
                                threadPool.executor(ThreadPool.Names.WRITE).execute(
                                    () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
                            }
                        }

                        @Override
                        public void onFailure(Exception e) {
                            // 创建 index 失败,将涉及这个 index 的请求响应设为失败
                            if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
                                // fail all requests involving this index, if create didn't work
                                for (int i = 0; i < bulkRequest.requests.size(); i++) {
                                    DocWriteRequest request = bulkRequest.requests.get(i);
                                    if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
                                        bulkRequest.requests.set(i, null);
                                    }
                                }
                            }
                            // 对成功创建的 index 继续执行请求的操作
                            if (counter.decrementAndGet() == 0) {
                                executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
                                    inner.addSuppressed(e);
                                    listener.onFailure(inner);
                                }), responses, indicesThatCannotBeCreated);
                            }
                        }
                    });
                }
            }
        } else {
            // 不需要检查 index 是否存在,直接执行请求的操作
            executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
        }
    }
    
    private final class BulkOperation extends ActionRunnable {
        private final Task task;
        private final BulkRequest bulkRequest;
        private final AtomicArray responses;
        private final long startTimeNanos;
        private final ClusterStateObserver observer;
        private final Map indicesThatCannotBeCreated;

        BulkOperation(Task task, BulkRequest bulkRequest, ActionListener listener, AtomicArray responses,
                long startTimeNanos, Map indicesThatCannotBeCreated) {
            super(listener);
            this.task = task;
            this.bulkRequest = bulkRequest;
            this.responses = responses;
            this.startTimeNanos = startTimeNanos;
            this.indicesThatCannotBeCreated = indicesThatCannotBeCreated;
            this.observer = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext());
        }

        @Override
        protected void doRun() {
            final ClusterState clusterState = observer.setAndGetObservedState();
            if (handleBlockExceptions(clusterState)) {
                return;
            }
            final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameexpressionResolver);
            metaData metaData = clusterState.metaData();
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest docWriteRequest = bulkRequest.requests.get(i);
                //the request can only be null because we set it to null in the previous step, so it gets ignored
                if (docWriteRequest == null) {
                    continue;
                }
                if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metaData)) {
                    continue;
                }
                Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
                try {
                    switch (docWriteRequest.opType()) {
                        case CREATE:
                        case INDEX:
                            IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                            final IndexmetaData indexmetaData = metaData.index(concreteIndex); // 根据metaData对indexRequest的routing赋值
                            MappingmetaData mappingMd = indexmetaData.mappingOrDefault();
                            Version indexCreated = indexmetaData.getCreationVersion();
                            indexRequest.resolveRouting(metaData); // 根据metaData对indexRequest的routing赋值
                            indexRequest.process(indexCreated, mappingMd, concreteIndex.getName()); // 这里,如果用户没有指定doc id,则会自动生成
                            break;
                        case UPDATE:
                            TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(),
                                (UpdateRequest) docWriteRequest);
                            break;
                        case DELETE:
                            docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
                            // check if routing is required, if so, throw error if routing wasn't specified
                            if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName())) {
                                throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
                            }
                            break;
                        default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
                    }
                } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),
                        docWriteRequest.id(), e);
                    BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
                    responses.set(i, bulkItemResponse);
                    // make sure the request gets never processed again
                    bulkRequest.requests.set(i, null);
                }
            }

            //将请求按照 shard 分组,构造一个 shardID 和对应 shardID 所有请求的 hashmap
            // first, go over all the requests and create a ShardId -> Operations mapping
            Map> requestsByShard = new HashMap<>();
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest request = bulkRequest.requests.get(i); // 从bulk请求中得到每个doc写入请求
                if (request == null) {
                    continue;
                }
                 根据 index 名或者 alias 名获取真正的 index 名
                String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
                //  // 根据路由,找出doc写入的目标shard id
                ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
                    request.routing()).shardId();
                //  requestsByShard的key是shard id,value是对应的单个doc写入请求(会被封装成BulkItemRequest)的集合
                List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
                shardRequests.add(new BulkItemRequest(i, request));
            }

            if (requestsByShard.isEmpty()) {
                listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
                    buildTookInMillis(startTimeNanos)));
                return;
            }

            
            final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
            String nodeId = clusterService.localNode().getId();
            //将请求按shard进行了拆分,接下来会将每个shard对应的所有请求封装为BulkShardRequest并交由TransportShardBulkAction来处理
            for (Map.Entry> entry : requestsByShard.entrySet()) {
                final ShardId shardId = entry.getKey();
                final List requests = entry.getValue();
                BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
                        requests.toArray(new BulkItemRequest[requests.size()]));
                bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
                bulkShardRequest.timeout(bulkRequest.timeout());
                bulkShardRequest.routedbasedOnClusterVersion(clusterState.version());
                if (task != null) {
                    bulkShardRequest.setParentTask(nodeId, task.getId());
                }
                //实际执行,进入TransportShardBulkAction进行处理
                shardBulkAction.execute(bulkShardRequest, new ActionListener() {
                    @Override
                    public void onResponse(BulkShardResponse bulkShardResponse) {
                        for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
                            // we may have no response if item failed
                            if (bulkItemResponse.getResponse() != null) {
                                bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                            }
                            responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    @Override
                    public void onFailure(Exception e) {
                        // create failures for all relevant requests
                        for (BulkItemRequest request : requests) {
                            final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
                            DocWriteRequest docWriteRequest = request.request();
                            responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
                                    new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    private void finishHim() {
                        listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
                            buildTookInMillis(startTimeNanos)));
                    }
                });
            }
        }

利用 TransportAction::execute 将写请求标记为 “transport”, 由 TransportReplicationAction.ReroutePhase::doRun 转发给主分片

TransportAction::execute–>>>org.elasticsearch.action.support.replication::doRun

protected void doRun() {
            setPhase(task, "routing");
            final ClusterState state = observer.setAndGetObservedState();
            final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
            if (blockException != null) {
                if (blockException.retryable()) {
                    logger.trace("cluster is blocked, scheduling a retry", blockException);
                    retry(blockException);
                } else {
                    finishAsFailed(blockException);
                }
            } else {
                final IndexmetaData indexmetaData = state.metaData().index(request.shardId().getIndex());
                if (indexmetaData == null) {
                    // ensure that the cluster state on the node is at least as high as the node that decided that the index was there
                    if (state.version() < request.routedbasedOnClusterVersion()) {
                        logger.trace("failed to find index [{}] for request [{}] despite sender thinking it would be here. " +
                                "Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
                            request.shardId().getIndex(), request, state.version(), request.routedbasedOnClusterVersion());
                        retry(new IndexNotFoundException("failed to find index as current cluster state with version [" + state.version() +
                            "] is stale (expected at least [" + request.routedbasedOnClusterVersion() + "]",
                            request.shardId().getIndexName()));
                        return;
                    } else {
                        finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
                        return;
                    }
                }
                //此处省略若干行。。。。。。

                final DiscoveryNode node = state.nodes().get(primary.currentNodeId());   //得到primary所在的node
                if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
                    performLocalAction(state, primary, node, indexmetaData);  // 如果primary所在的node和primary所在的node一致,则直接在本地执行 
                } else {
                    performRemoteAction(state, primary, node);
                }
            }
        }

performLocalAction/performRemoteAction实际执行函数均为performAction

       private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node, IndexmetaData indexmetaData) {
            setPhase(task, "waiting_on_primary");
            if (logger.isTraceEnabled()) {
                logger.trace("send action [{}] to local primary [{}] for request [{}] with cluster state version [{}] to [{}] ",
                    transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
            }
            performAction(node, transportPrimaryAction, true,
                new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexmetaData.primaryTerm(primary.id())));
        }

        private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
            if (state.version() < request.routedbasedOnClusterVersion()) {
                logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state "
                        + "version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request,
                    state.version(), request.routedbasedOnClusterVersion());
                retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version ["
                    + state.version() + "] is stale (expected at least [" + request.routedbasedOnClusterVersion() + "]");
                return;
            } else {
                // chasing the node with the active primary for a second hop requires that we are at least up-to-date with the current cluster state version this prevents redirect loops between two nodes when a primary was relocated and the relocation target is not aware that it is the active primary shard already.
                request.routedbasedOnClusterVersion(state.version());
            }
            if (logger.isTraceEnabled()) {
                logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", actionName,
                    request.shardId(), request, state.version(), primary.currentNodeId());
            }
            setPhase(task, "rerouted");
            performAction(node, actionName, false, request);
        }

        private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
                                   final TransportRequest requestToPerform) {
            transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler() {

                @Override
                public Response read(StreamInput in) throws IOException {
                    return newResponseInstance(in);
                }

                @Override
                public String executor() {
                    return ThreadPool.Names.SAME;
                }

                @Override
                public void handleResponse(Response response) {
                    finishOnSuccess(response);
                }

                @Override
                public void handleException(TransportException exp) {
                    try {
                        // if we got disconnected from the node, or the node / shard is not in the right state (being closed)
                        final Throwable cause = exp.unwrapCause();
                        if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
                            (isPrimaryAction && retryPrimaryException(cause))) {
                            logger.trace(() -> new ParameterizedMessage(
                                    "received an error from node [{}] for request [{}], scheduling a retry",
                                    node.getId(), requestToPerform), exp);
                            retry(exp);
                        } else {
                            finishAsFailed(exp);
                        }
                    } catch (Exception e) {
                        e.addSuppressed(exp);
                        finishWithUnexpectedFailure(e);
                    }
                }
            });
        }

        void retry(Exception failure) {
            assert failure != null;
            if (observer.isTimedOut()) {
                // 超时时已经做过最后一次尝试,这里将不会重试了
                finishAsFailed(failure);
                return;
            }
            setPhase(task, "waiting_for_retry");
            request.onRetry();
            observer.waitForNextChange(new ClusterStateObserver.Listener() {
                @Override
                public void onNewClusterState(ClusterState state) {
                    run();
                }

                @Override
                public void onClusterServiceClose() {
                    finishAsFailed(new NodeClosedException(clusterService.localNode()));
                }

                @Override
                public void onTimeout(Timevalue timeout) {
                    //超时,重试一次
                    // Try one more time...
                    run();
                }
            });
        }

4. primary处理

org.elasticsearch.action.support.replication.ReplicationOperation::execute—>>>TransportShardBulkAction.shardOperationOnPrimary:

org.elasticsearch.action.support.replication.ReplicationOperation primary所在的node收到协调节点发过来的写入请求后,开始正式执行写入的逻辑,写入执行的入口是在ReplicationOperation类的execute方法,该方法中执行的两个关键步骤是,首先写主shard,如果主shard写入成功,再将写入请求发送到从shard所在的节点。

    public void execute() throws Exception {
        final String activeShardCountFailure = checkActiveShardCount();
        final ShardRouting primaryRouting = primary.routingEntry();
        final ShardId primaryId = primaryRouting.shardId();
        if (activeShardCountFailure != null) {
            finishAsFailed(new UnavailableShardsException(primaryId,
                "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
            return;
        }

        totalShards.incrementAndGet();
        pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
        primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure));
    }

    private void handlePrimaryResult(final PrimaryResultT primaryResult) {
        this.primaryResult = primaryResult;
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        if (replicaRequest != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
            }
            //

            // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
            // we have to make sure that every operation indexed into the primary after recovery start will also be replicated
            // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
            // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
            // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
            // of the sampled replication group, and advanced further than what the given replication group would allow it to.
            // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
            final long globalCheckpoint = primary.computedGlobalCheckpoint();
            // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
            // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
            // on.
            final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
            assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
            final ReplicationGroup replicationGroup = primary.getReplicationGroup();
            // // 标记无法接收本次请求的分片,被标记的分片在主分片挂掉后不会被选为新的主分片,避免数据不一致
            markUnavailableShardsAsStale(replicaRequest, replicationGroup);
            performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
        }
        primaryResult.runPostReplicationActions(new ActionListener() {

            @Override
            public void onResponse(Void aVoid) {
                successfulShards.incrementAndGet();
                try {
                    //完成请求后更新 checkpoint,在集群出现问题需要恢复时可以从 checkpoint 开始恢复,避免大量重建操作
                    updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
                } finally {
                    decPendingAndFinishIfNeeded();
                }
            }

            @Override
            public void onFailure(Exception e) {
                logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
                // TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
                // go out of sync with the primary
                finishAsFailed(e);
            }
        });
    }

    
    private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
                                   final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
        // for total stats, add number of unassigned shards and
        // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
        totalShards.addAndGet(replicationGroup.getSkippedShards().size());

        final ShardRouting primaryRouting = primary.routingEntry();

        for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
            if (shard.isSameAllocation(primaryRouting) == false) {
                performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
            }
        }
    }

我们来看写primary的关键代码,写primary入口函数为TransportShardBulkAction.shardOperationOnPrimary:----???org.elasticsearch.index.engine.InternalEngine.Index

public IndexResult index(Index index) throws IOException {
        assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
        final boolean doThrottle = index.origin().isRecovery() == false;
        try (ReleasableLock releasableLock = readLock.acquire()) {
            ensureOpen();
            assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
            try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
                Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
                lastWriteNanos = index.startTime();
                

                
                final IndexingStrategy plan = indexingStrategyForOperation(index);

                final IndexResult indexResult;
                if (plan.earlyResultOnPreFlightError.isPresent()) {
                    indexResult = plan.earlyResultOnPreFlightError.get();
                    assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
                } else {
                    // generate or register sequence number
                    if (index.origin() == Operation.Origin.PRIMARY) {
                        index = new Index(index.uid(), index.parsedDoc(), generateSeqNoForOperationOnPrimary(index), index.primaryTerm(),
                            index.version(), index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(),
                            index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm());

                        final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdatedocument == false;
                        if (toAppend == false) {
                            advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo());
                        }
                    } else {
                        markSeqNoAsSeen(index.seqNo());
                    }

                    assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();

                    if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                        indexResult = indexIntoLucene(index, plan);  // 将数据写入lucene,最终会调用lucene的文档写入接口
                    } else {
                        indexResult = new IndexResult(
                            plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
                    }
                }
                if (index.origin().isFromTranslog() == false) {
                    final Translog.Location location;
                    if (indexResult.getResultType() == Result.Type.SUCCESS) {
                        location = translog.add(new Translog.Index(index, indexResult)); // 写translog
                    } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
                        //如果有文档失败,将生成的seq_no在translog and Lucene中记录为空操作,即no-op
                        // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
                        final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(),
                            index.startTime(), indexResult.getFailure().toString());
                        location = innerNoOp(noOp).getTranslogLocation();
                    } else {
                        location = null;
                    }
                    indexResult.setTranslogLocation(location);
                }
                if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
                    final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
                    versionMap.maybePutIndexUnderLock(index.uid().bytes(),
                        new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm()));
                }
                localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
                if (indexResult.getTranslogLocation() == null) {
                    // the op is coming from the translog (and is hence persisted already) or it does not have a sequence number
                    assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
                    localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
                }
                indexResult.setTook(System.nanoTime() - index.startTime());
                indexResult.freeze();
                return indexResult;
            }
        } catch (RuntimeException | IOException e) {
            try {
                if (e instanceof AlreadyClosedException == false && treatdocumentFailureAsTragicError(index)) {
                    failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
                } else {
                    maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
                }
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw e;
        }
    }

以上代码可以看出,ES的写入操作是先写lucene,将数据写入到lucene内存后再写translog,这里和传统的WAL先写日志后写内存有所区别。ES之所以先写lucene后写log主要原因大概是写入Lucene时,Lucene会再对数据进行一些检查,有可能出现写入Lucene失败的情况。如果先写translog,那么就要处理写入translog成功但是写入Lucene一直失败的问题,所以ES采用了先写Lucene的方式。


写入Lucene

    private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
        throws IOException {
        assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
        assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
        assert plan.indexIntoLucene || plan.addStaleOpToLucene;
        
        index.parsedDoc().updateSeqID(index.seqNo(), index.primaryTerm());
        index.parsedDoc().version().setLongValue(plan.versionForIndexing);
        try {
            if (plan.addStaleOpToLucene) {
                addStaleDocs(index.docs(), indexWriter);
            } else if (plan.useLuceneUpdatedocument) {
                assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true);
                updateDocs(index.uid(), index.docs(), indexWriter);
            } else {
                // document does not exists, we can optimize for create, but double check if assertions are running
                assert assertDocDoesNotExist(index, canOptimizeAdddocument(index) == false);
                addDocs(index.docs(), indexWriter);
            }
            return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
        } catch (Exception ex) {
            if (ex instanceof AlreadyClosedException == false &&
                indexWriter.getTragicException() == null && treatdocumentFailureAsTragicError(index) == false) {
                
                return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), index.seqNo());
            } else {
                throw ex;
            }
        }
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746917.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号