------------------------------文档开始--------------------------------------------------
启动远程调试模式:./gradlew run --debug-jvm
1.数据准备changjing@changjingdeMacBook-Air test % cat a.json
{ “index” : { “_index” : “test”, “_id” : “1” } }
{ “field1” : “value1” }
{ “delete” : { “_index” : “test”, “_id” : “2” } }
{ “create” : { “_index” : “test”, “_id” : “3” } }
{ “field1” : “value3” }
{ “update” : {"_id" : “1”, “_index” : “test”} }
{ “doc” : {“field2” : “value2”} }
curl -u elastic:password -XPOST http://localhost:9200/_bulk --data-binary @a.json -H ‘Content-Type: application/json’
构造一个 shardID
1)由于自定义 routing 可能影响文档分布的均匀性,因此引入了一个 partitionOffset 参数。
2)当使用自定义 routing 时,将 id 的 hash 值对配置中的 routing_partition_size 取模作为 partitionOffset。使用默认 routing 时 partitionOffset 值为 0
3)shardID 计算公式 ((hash(id 或 routing 参数) + partitionOffset) % number_of_routing_shards) / (number_of_routing_shards/number_of_shards),number_of_shards 是配置文件中设置的总的 shard 的数量,number_of_routing_shards 也可以进行配置,默认与 number_of_shards 相同
4)Hash 函数使用 Murmur3,Murmur3 是非加密散列函数,不适用于加密目的
action.support.replication.ReplicationOperation::handlePrimaryResult
遵循微软的 PacificA协议,存在长尾,即需等待响应最慢的shard返回
1)写primary shard
2)写完primary后,重新获取GlobalCheckpoint、maxSeqNoOfUpdatesOrDeletes、replicationGroup,确保值最新可用。标记无法接收本次请求的分片,被标记的分片在主分片挂掉后不会被选为新的主分片,避免数据不一致
3)遍历复制组内shard,写副本shard,若写入失败,primary认为该replica shard发生故障不可用,将会向master汇报并移除该replica。写入成功更新本地检查点。
4)等待复制组内所有shard写入完成后,更新本地检查点及全局检查点,返回请求结果
index.engine. InternalEngine:: index
1)基于本地timestamp自动生成ID,利用该id使用adddocument来替代updatedocument。
2)调用indexIntoLucene将数据写入lucene, 更新文档的sequence number和primary term,最终调用lucene的IndexWriter写入数据
3)写translog。如果有文档失败,将生成的seq_no在translog and Lucene中记录为空操作,即no-op
org.elasticsearch.http.AbstractHttpServerTransport::handleIncomingRequest—>> public org.elasticsearch.rest.RestController::dispatchRequest(public)—>>::tryAllHandlers—>>handlers::retrieveAll—>>::dispatchRequest(private)—>>
///org.elasticsearch.rest.action.document::RestChannelConsumer–>>>///org.elasticsearch.action.support.TransportAction—>::execute—>::proceed///org.elasticsearch.action.bulk.TransportBulkAction::doExecute
AbstractHttpServerTransport接收请求
org.elasticsearch.http.AbstractHttpServerTransport
private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {
Exception badRequestCause = exception;
//从来自 Netty 的传入请求创建一个 REST 请求。但是,如果存在错误编码的参数或 Content-Type 标头无效,则创建此请求可能会失败。如果发生这些特定故障之一,我们会尝试在没有导致异常的输入的情况下再次创建 REST 请求(例如,我们删除 Content-Type 标头,或跳过对参数的解码)。一旦我们有一个请求,我们就会将该请求作为一个错误请求发送,并带有导致我们将该请求视为错误的底层异常
final RestRequest restRequest;
{
RestRequest innerRestRequest;
try {
innerRestRequest = RestRequest.request(xContentRegistry, httpRequest, httpChannel);
} catch (final RestRequest.ContentTypeHeaderException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
innerRestRequest = requestWithoutContentTypeHeader(httpRequest, httpChannel, badRequestCause);
} catch (final RestRequest.BadParameterException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
innerRestRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
}
restRequest = innerRestRequest;
}
final HttpTracer trace = tracer.maybeTraceRequest(restRequest, exception);
//创建一个用于发送响应的channel。但是,如果任何 filter_path、human 或 pretty 参数的参数值无效,则创建此通道可能会失败。我们通过通道构造函数中的 IllegalArgumentException 检测到这些特定故障,然后尝试创建一个绕过解析这些参数值的新channel
final RestChannel channel;
{
RestChannel innerChannel;
ThreadContext threadContext = threadPool.getThreadContext();
try {
innerChannel =
new DefaultRestChannel(httpChannel, httpRequest, restRequest, bigArrays, handlingSettings, threadContext, trace);
} catch (final IllegalArgumentException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
final RestRequest innerRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
innerChannel =
new DefaultRestChannel(httpChannel, httpRequest, innerRequest, bigArrays, handlingSettings, threadContext, trace);
}
channel = innerChannel;
}
dispatchRequest(restRequest, channel, badRequestCause); //继续----》
}
RestController执行调度请求,public dispatchRequest会根据request找到其对应的handler,private dispatchRequest中会调用handler的handleRequest方法处理请求。
org.elasticsearch.rest.RestController
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
if (request.rawPath().equals("/favicon.ico")) {
handleFavicon(request.method(), request.uri(), channel);
return;
}
try {
tryAllHandlers(request, channel, threadContext); //实际执行
} catch (Exception e) {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error(() ->
new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
}
}
}
private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
//解析restheader,存入threadContext
for (final RestHeaderDefinition restHeader : headersToCopy) {
final String name = restHeader.getName();
final List headerValues = request.getAllHeaderValues(name);
if (headerValues != null && headerValues.isEmpty() == false) {
final List distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
channel.sendResponse(
BytesRestResponse.
createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "]."));
return;
} else {
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
}
}
}
// error_trace cannot be used when we disable detailed errors
// we consume the error_trace parameter first to ensure that it is always consumed
if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
channel.sendResponse(
BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));
return;
}
final String rawPath = request.rawPath(); //rawPath:"/_bulk"
final String uri = request.uri(); //uri:"/_bulk"
final RestRequest.Method requestMethod; //requestMethod:POST
try {
//解析 HTTP 方法,如果方法无效则失败
// Resolves the HTTP method and fails if the method is invalid
requestMethod = request.method();
//根据rawPath,遍历所有可能的handlers,尝试处理请求
// Loop through all possible handlers, attempting to dispatch the request
Iterator allHandlers = getAllHandlers(request.params(), rawPath);
while (allHandlers.hasNext()) {
final RestHandler handler;
final MethodHandlers handlers = allHandlers.next();
if (handlers == null) {
handler = null;
} else {
handler = handlers.getHandler(requestMethod);
}
if (handler == null) {
if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
return;
}
} else {
dispatchRequest(request, channel, handler); //---->实际执行,继续
return;
}
}
} catch (final IllegalArgumentException e) {
handleUnsupportedHttpMethod(uri, null, channel, getValidHandlerMethodSet(rawPath), e);
return;
}
// If request has not been handled, fallback to a bad request error.
handleBadRequest(uri, requestMethod, channel);
}
提前注册:ES在启动时会通过RestController的registerHandler方法,提前把所有需要使用的handler注册到对应的hander列表中,包括http请求方法(GET、PUT、POST、DELETE等)的handlers列表。当用户请求到达时,就可以通过RestController的getAllHandlers方法,并根据http请求方法和路径取出对应的handler
public void registerHandler(final RestHandler restHandler) {
restHandler.routes().forEach(route -> registerHandler(route.getMethod(), route.getPath(), restHandler));
restHandler.deprecatedRoutes().forEach(route ->
registerAsDeprecatedHandler(route.getMethod(), route.getPath(), restHandler, route.getDeprecationMessage()));
restHandler.replacedRoutes().forEach(route -> registerWithDeprecatedHandler(route.getMethod(), route.getPath(),
restHandler, route.getDeprecatedMethod(), route.getDeprecatedPath()));
}
protected void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
if (handler instanceof baseRestHandler) {
usageService.addRestHandler((baseRestHandler) handler);
}
final RestHandler maybeWrappedHandler = handlerWrapper.apply(handler);
handlers.insertOrUpdate(path, new MethodHandlers(path, maybeWrappedHandler, method),
(mHandlers, newMHandler) -> mHandlers.addMethods(maybeWrappedHandler, method));
}
IteratorgetAllHandlers(@Nullable Map requestParamsRef, String rawPath) { final Supplier
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
final int contentLength = request.content().length();
if (contentLength > 0) {
final XContentType xContentType = request.getXContentType();
//此处省略若干行.....
}
RestChannel responseChannel = channel;
try {
if (handler.canTripCircuitBreaker()) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "");
} else {
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// 为请求保留字节,也需要通过channel发送response
//iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
//如果需要复制,则在断路器中将请求计数加倍?
// TODO: Count requests double in the circuit breaker if they need copying?
if (handler.allowsUnsafeBuffers() == false) {
request.ensureSafeBuffers();
}
handler.handleRequest(request, responseChannel, client); //---->继续
} catch (Exception e) {
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
}
}
对于bulk操作,其请求对应的handler是RestBulkAction,es进程启动时注册
org.elasticsearch.rest.action.document.RestBulkAction
public Listroutes() { return unmodifiableList(asList( new Route(POST, "/_bulk"), new Route(PUT, "/_bulk"), new Route(POST, "/{index}/_bulk"), new Route(PUT, "/{index}/_bulk"), // Deprecated typed endpoints. new Route(POST, "/{index}/{type}/_bulk"), new Route(PUT, "/{index}/{type}/_bulk"))); }
RestBulkAction会将RestRequest解析并转化为BulkRequest,然后再对BulkRequest做处理,这块的逻辑在prepareRequest方法中
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
BulkRequest bulkRequest = Requests.bulkRequest();
String defaultIndex = request.param("index");
String defaultType = request.param("type");
if (defaultType == null) {
defaultType = MapperService.SINGLE_MAPPING_NAME;
} else {
deprecationLogger.deprecatedAndMaybeLog("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
String defaultRouting = request.param("routing");
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
String defaultPipeline = request.param("pipeline");
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting,
defaultFetchSourceContext, defaultPipeline, allowExplicitIndex, request.getXContentType());
return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel)); //----继续处理
}
public void bulk(final BulkRequest request, final ActionListener listener) {
execute(BulkAction.INSTANCE, request, listener);
}
public < Request extends ActionRequest,
Response extends ActionResponse
> Task executeLocally(ActionType action, Request request, ActionListener listener) {
return transportAction(action).execute(request, listener);
}
public final void execute(Task task, Request request, ActionListener listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
if (task != null && request.getShouldStoreResult()) {
listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
}
RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(task, actionName, request, listener);
}
TransportAction会调用一个请求过滤链来处理请求,如果相关的插件定义了对该action的过滤处理,则先会执行插件的处理逻辑,然后再进入TransportAction的处理逻辑,过滤链的处理逻辑如下
org.elasticsearch.action.support.TransportAction
public void proceed(Task task, String actionName, Request request, ActionListenerlistener) { int i = index.getAndIncrement(); try { if (i < this.action.filters.length) { this.action.filters[i].apply(task, actionName, request, listener, this); } else if (i == this.action.filters.length) { this.action.doExecute(task, request, listener); // 执行TransportAction的处理逻辑 } else { listener.onFailure(new IllegalStateException("proceed was called too many times")); } } catch(Exception e) { logger.trace("Error during transport action execution.", e); listener.onFailure(e); } }
3.写入步骤
org.elasticsearch.action.bulk.TransportBulkAction.doExecute—>
对于Bulk请求,具体的执行对象是TransportBulkAction的实例,Rest层转化为Transport层的处理. TransportBulkAction的doExecute处理逻辑如下
org.elasticsearch.action.bulk.TransportBulkAction::doExecute—》〉》::executeBulk—>TransportBulkAction.BulkOperation::doRun—》〉org.elasticsearch.action.support.TransportAction:execute—》〉TransportShardBulkAction:
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListenerlistener) { final long startTime = relativeTime(); final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); boolean hasIndexRequestsWithPipelines = false; final metaData metaData = clusterService.state().getmetaData(); final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion(); for (DocWriteRequest> actionRequest : bulkRequest.requests) { IndexRequest indexRequest = getIndexWriteRequest(actionRequest); if (indexRequest != null) { //解析请求是否存在Pipeline,每个index请求都需要评估,因为这个方法同时修改了IndexRequest // Each index request needs to be evaluated, because this method also modifies the IndexRequest boolean indexRequestHasPipeline = resolvePipelines(actionRequest, indexRequest, metaData); hasIndexRequestsWithPipelines |= indexRequestHasPipeline; } if (actionRequest instanceof IndexRequest) { IndexRequest ir = (IndexRequest) actionRequest; ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion); if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally"); } } } if (hasIndexRequestsWithPipelines) { //pipeline处理逻辑,没有pipeline直接跳过 // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method, // this path is never taken. try { if (Assertions.ENABLED) { final boolean arePipelinesResolved = bulkRequest.requests() .stream() .map(TransportBulkAction::getIndexWriteRequest) .filter(Objects::nonNull) .allMatch(IndexRequest::isPipelineResolved); assert arePipelinesResolved : bulkRequest; } if (clusterService.localNode().isIngestNode()) { processBulkIndexIngestRequest(task, bulkRequest, listener); } else { ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); } } catch (Exception e) { listener.onFailure(e); } return; } if (needToCheck()) { //配置中设定了检查请求涉及的 index,并自动创建缺失的index // Attempt to create all the indices that we're going to need during the bulk before we start. //Step 1: 收集请求中涉及的 index // Step 1: collect all the indices in the request final Set indices = bulkRequest.requests.stream() // delete requests should not attempt to create the index (if the index does not // exists), unless an external versioning is used .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE || request.versionType() == VersionType.EXTERNAL || request.versionType() == VersionType.EXTERNAL_GTE) .map(DocWriteRequest::index) .collect(Collectors.toSet()); //筛选出可以自动创建的 index ,同时创建不能自动创建的index列表。(由于要创建的 index 已存在或者名字被其他 index 的 alias 占用或者配置不支持等原因而不能创建) final Map indicesThatCannotBeCreated = new HashMap<>(); Set autoCreateIndices = new HashSet<>(); ClusterState state = clusterService.state(); for (String index : indices) { boolean shouldAutoCreate; try { shouldAutoCreate = shouldAutoCreate(index, state); } catch (IndexNotFoundException e) { shouldAutoCreate = false; // 不可创建的 index 存放在 indicesThatCannotBeCreated,这是个 HashMap,key 是 index 的名字,value 是不可创建的原因 indicesThatCannotBeCreated.put(index, e); } if (shouldAutoCreate) { // 需要自动创建的 index 放在 autoCreateIndices autoCreateIndices.add(index); } } //Step 3: 创建需要自动创建的 index,创建完成后再继续执行请求的操作 // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back. if (autoCreateIndices.isEmpty()) { 无需创建 index,直接执行请求的操作 executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); } else { // 这里使用了一个原子性的计数器,用于在多线程执行的环境下记录已成功创建的 index 的数量 final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); for (String index : autoCreateIndices) { // 执行创建 index createIndex(index, bulkRequest.timeout(), new ActionListener () { @Override public void onResponse(CreateIndexResponse result) { if (counter.decrementAndGet() == 0) { // 成功创建 index,当计数器为 0,即所有 index 都已创建完成后执行请求的任务 threadPool.executor(ThreadPool.Names.WRITE).execute( () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); } } @Override public void onFailure(Exception e) { // 创建 index 失败,将涉及这个 index 的请求响应设为失败 if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { // fail all requests involving this index, if create didn't work for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest> request = bulkRequest.requests.get(i); if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { bulkRequest.requests.set(i, null); } } } // 对成功创建的 index 继续执行请求的操作 if (counter.decrementAndGet() == 0) { executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { inner.addSuppressed(e); listener.onFailure(inner); }), responses, indicesThatCannotBeCreated); } } }); } } } else { // 不需要检查 index 是否存在,直接执行请求的操作 executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); } }
private final class BulkOperation extends ActionRunnable {
private final Task task;
private final BulkRequest bulkRequest;
private final AtomicArray responses;
private final long startTimeNanos;
private final ClusterStateObserver observer;
private final Map indicesThatCannotBeCreated;
BulkOperation(Task task, BulkRequest bulkRequest, ActionListener listener, AtomicArray responses,
long startTimeNanos, Map indicesThatCannotBeCreated) {
super(listener);
this.task = task;
this.bulkRequest = bulkRequest;
this.responses = responses;
this.startTimeNanos = startTimeNanos;
this.indicesThatCannotBeCreated = indicesThatCannotBeCreated;
this.observer = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext());
}
@Override
protected void doRun() {
final ClusterState clusterState = observer.setAndGetObservedState();
if (handleBlockExceptions(clusterState)) {
return;
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameexpressionResolver);
metaData metaData = clusterState.metaData();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest> docWriteRequest = bulkRequest.requests.get(i);
//the request can only be null because we set it to null in the previous step, so it gets ignored
if (docWriteRequest == null) {
continue;
}
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metaData)) {
continue;
}
Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
try {
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
final IndexmetaData indexmetaData = metaData.index(concreteIndex); // 根据metaData对indexRequest的routing赋值
MappingmetaData mappingMd = indexmetaData.mappingOrDefault();
Version indexCreated = indexmetaData.getCreationVersion();
indexRequest.resolveRouting(metaData); // 根据metaData对indexRequest的routing赋值
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName()); // 这里,如果用户没有指定doc id,则会自动生成
break;
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(),
(UpdateRequest) docWriteRequest);
break;
case DELETE:
docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
}
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),
docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(i, null);
}
}
//将请求按照 shard 分组,构造一个 shardID 和对应 shardID 所有请求的 hashmap
// first, go over all the requests and create a ShardId -> Operations mapping
Map> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest> request = bulkRequest.requests.get(i); // 从bulk请求中得到每个doc写入请求
if (request == null) {
continue;
}
根据 index 名或者 alias 名获取真正的 index 名
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
// // 根据路由,找出doc写入的目标shard id
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
request.routing()).shardId();
// requestsByShard的key是shard id,value是对应的单个doc写入请求(会被封装成BulkItemRequest)的集合
List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}
if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
return;
}
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
String nodeId = clusterService.localNode().getId();
//将请求按shard进行了拆分,接下来会将每个shard对应的所有请求封装为BulkShardRequest并交由TransportShardBulkAction来处理
for (Map.Entry> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
bulkShardRequest.routedbasedOnClusterVersion(clusterState.version());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
//实际执行,进入TransportShardBulkAction进行处理
shardBulkAction.execute(bulkShardRequest, new ActionListener() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest> docWriteRequest = request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
}
});
}
}
利用 TransportAction::execute 将写请求标记为 “transport”, 由 TransportReplicationAction.ReroutePhase::doRun 转发给主分片
TransportAction::execute–>>>org.elasticsearch.action.support.replication::doRun
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
finishAsFailed(blockException);
}
} else {
final IndexmetaData indexmetaData = state.metaData().index(request.shardId().getIndex());
if (indexmetaData == null) {
// ensure that the cluster state on the node is at least as high as the node that decided that the index was there
if (state.version() < request.routedbasedOnClusterVersion()) {
logger.trace("failed to find index [{}] for request [{}] despite sender thinking it would be here. " +
"Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
request.shardId().getIndex(), request, state.version(), request.routedbasedOnClusterVersion());
retry(new IndexNotFoundException("failed to find index as current cluster state with version [" + state.version() +
"] is stale (expected at least [" + request.routedbasedOnClusterVersion() + "]",
request.shardId().getIndexName()));
return;
} else {
finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
return;
}
}
//此处省略若干行。。。。。。
final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); //得到primary所在的node
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexmetaData); // 如果primary所在的node和primary所在的node一致,则直接在本地执行
} else {
performRemoteAction(state, primary, node);
}
}
}
performLocalAction/performRemoteAction实际执行函数均为performAction
private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node, IndexmetaData indexmetaData) {
setPhase(task, "waiting_on_primary");
if (logger.isTraceEnabled()) {
logger.trace("send action [{}] to local primary [{}] for request [{}] with cluster state version [{}] to [{}] ",
transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
}
performAction(node, transportPrimaryAction, true,
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexmetaData.primaryTerm(primary.id())));
}
private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
if (state.version() < request.routedbasedOnClusterVersion()) {
logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state "
+ "version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request,
state.version(), request.routedbasedOnClusterVersion());
retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version ["
+ state.version() + "] is stale (expected at least [" + request.routedbasedOnClusterVersion() + "]");
return;
} else {
// chasing the node with the active primary for a second hop requires that we are at least up-to-date with the current cluster state version this prevents redirect loops between two nodes when a primary was relocated and the relocation target is not aware that it is the active primary shard already.
request.routedbasedOnClusterVersion(state.version());
}
if (logger.isTraceEnabled()) {
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", actionName,
request.shardId(), request, state.version(), primary.currentNodeId());
}
setPhase(task, "rerouted");
performAction(node, actionName, false, request);
}
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
final TransportRequest requestToPerform) {
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler() {
@Override
public Response read(StreamInput in) throws IOException {
return newResponseInstance(in);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(Response response) {
finishOnSuccess(response);
}
@Override
public void handleException(TransportException exp) {
try {
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
final Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
(isPrimaryAction && retryPrimaryException(cause))) {
logger.trace(() -> new ParameterizedMessage(
"received an error from node [{}] for request [{}], scheduling a retry",
node.getId(), requestToPerform), exp);
retry(exp);
} else {
finishAsFailed(exp);
}
} catch (Exception e) {
e.addSuppressed(exp);
finishWithUnexpectedFailure(e);
}
}
});
}
void retry(Exception failure) {
assert failure != null;
if (observer.isTimedOut()) {
// 超时时已经做过最后一次尝试,这里将不会重试了
finishAsFailed(failure);
return;
}
setPhase(task, "waiting_for_retry");
request.onRetry();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
run();
}
@Override
public void onClusterServiceClose() {
finishAsFailed(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(Timevalue timeout) {
//超时,重试一次
// Try one more time...
run();
}
});
}
4. primary处理
org.elasticsearch.action.support.replication.ReplicationOperation::execute—>>>TransportShardBulkAction.shardOperationOnPrimary:
org.elasticsearch.action.support.replication.ReplicationOperation primary所在的node收到协调节点发过来的写入请求后,开始正式执行写入的逻辑,写入执行的入口是在ReplicationOperation类的execute方法,该方法中执行的两个关键步骤是,首先写主shard,如果主shard写入成功,再将写入请求发送到从shard所在的节点。
public void execute() throws Exception {
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure != null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
return;
}
totalShards.incrementAndGet();
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure));
}
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
this.primaryResult = primaryResult;
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
}
//
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.computedGlobalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
// on.
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
// // 标记无法接收本次请求的分片,被标记的分片在主分片挂掉后不会被选为新的主分片,避免数据不一致
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}
primaryResult.runPostReplicationActions(new ActionListener() {
@Override
public void onResponse(Void aVoid) {
successfulShards.incrementAndGet();
try {
//完成请求后更新 checkpoint,在集群出现问题需要恢复时可以从 checkpoint 开始恢复,避免大量重建操作
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
}
@Override
public void onFailure(Exception e) {
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
// go out of sync with the primary
finishAsFailed(e);
}
});
}
private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
// for total stats, add number of unassigned shards and
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
totalShards.addAndGet(replicationGroup.getSkippedShards().size());
final ShardRouting primaryRouting = primary.routingEntry();
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
}
}
}
我们来看写primary的关键代码,写primary入口函数为TransportShardBulkAction.shardOperationOnPrimary:----???org.elasticsearch.index.engine.InternalEngine.Index
public IndexResult index(Index index) throws IOException {
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
final boolean doThrottle = index.origin().isRecovery() == false;
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
lastWriteNanos = index.startTime();
final IndexingStrategy plan = indexingStrategyForOperation(index);
final IndexResult indexResult;
if (plan.earlyResultOnPreFlightError.isPresent()) {
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
} else {
// generate or register sequence number
if (index.origin() == Operation.Origin.PRIMARY) {
index = new Index(index.uid(), index.parsedDoc(), generateSeqNoForOperationOnPrimary(index), index.primaryTerm(),
index.version(), index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(),
index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm());
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdatedocument == false;
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo());
}
} else {
markSeqNoAsSeen(index.seqNo());
}
assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
indexResult = indexIntoLucene(index, plan); // 将数据写入lucene,最终会调用lucene的文档写入接口
} else {
indexResult = new IndexResult(
plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
}
}
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult)); // 写translog
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
//如果有文档失败,将生成的seq_no在translog and Lucene中记录为空操作,即no-op
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(),
index.startTime(), indexResult.getFailure().toString());
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm()));
}
localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
if (indexResult.getTranslogLocation() == null) {
// the op is coming from the translog (and is hence persisted already) or it does not have a sequence number
assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
return indexResult;
}
} catch (RuntimeException | IOException e) {
try {
if (e instanceof AlreadyClosedException == false && treatdocumentFailureAsTragicError(index)) {
failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
} else {
maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
}
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}
以上代码可以看出,ES的写入操作是先写lucene,将数据写入到lucene内存后再写translog,这里和传统的WAL先写日志后写内存有所区别。ES之所以先写lucene后写log主要原因大概是写入Lucene时,Lucene会再对数据进行一些检查,有可能出现写入Lucene失败的情况。如果先写translog,那么就要处理写入translog成功但是写入Lucene一直失败的问题,所以ES采用了先写Lucene的方式。
写入Lucene
private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
throws IOException {
assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
assert plan.indexIntoLucene || plan.addStaleOpToLucene;
index.parsedDoc().updateSeqID(index.seqNo(), index.primaryTerm());
index.parsedDoc().version().setLongValue(plan.versionForIndexing);
try {
if (plan.addStaleOpToLucene) {
addStaleDocs(index.docs(), indexWriter);
} else if (plan.useLuceneUpdatedocument) {
assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true);
updateDocs(index.uid(), index.docs(), indexWriter);
} else {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAdddocument(index) == false);
addDocs(index.docs(), indexWriter);
}
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
if (ex instanceof AlreadyClosedException == false &&
indexWriter.getTragicException() == null && treatdocumentFailureAsTragicError(index) == false) {
return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), index.seqNo());
} else {
throw ex;
}
}
}



