栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【elasticsearch】elasticsearch源码恢复流程

【elasticsearch】elasticsearch源码恢复流程

es 7.7
副本分片请求流程,函数入口:IndicesClusterStateService.createOrUpdateShards

routing table:get _cluster/state/routing_table.保存“索引->分片”的对应关系,即每个索引的每个分片在哪个节点

routing nodes:get _cluster/state/routing_nodes。后者保存“节点->分片”的对应关系,即每个节点分别有哪些索引的哪些分片。
获取本节点的routing table(实际为shard routing信息),即通过routing nodes拿到了当前节点的分片信息,通过分片获取到对应Index,再拿到IndexService,查看分片是否已经存在,如果不存在,就进入createShard,否则进到updateShard;createShards主要处理处于initializing的分片,即分片恢复也会进入

org.elasticsearch.indices.cluster.IndicesClusterStateService

    private void createOrUpdateShards(final ClusterState state) {
        RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
        if (localRoutingNode == null) {
            return;
        }

        DiscoveryNodes nodes = state.nodes();
        RoutingTable routingTable = state.routingTable();

        for (final ShardRouting shardRouting : localRoutingNode) { 
        //判断本地节点是否在routingNodes,如果在,说明本地节点有分片创建或更新的需求,否则跳过
            ShardId shardId = shardRouting.shardId();
            if (failedShardsCache.containsKey(shardId) == false) { //
                AllocatedIndex indexService = indicesService.indexService(shardId.getIndex());
                assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices";//探测indexService非空
                Shard shard = indexService.getShardOrNull(shardId.id());
                if (shard == null) {//shard不存在创建???----->
                    assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards"; //探测shard状态是否为INITIALIZING
                    createShard(nodes, routingTable, shardRouting, state);//副本恢复入口
                } else {//shard存在更新
                    updateShard(nodes, shardRouting, shard, routingTable, state);
                }
            }
        }
    }

IndicesClusterStateService.createShard函数判断shardRouting的类型,如果恢复类型为PEER,则寻找源节点,调用indicesService.createShard。

shardRouting恢复类型可选为:

EMPTY_STORE:recovery from an empty store 从空store恢复
EXISTING_STORE:recovery from an existing store 从存在的store恢复
PEER:recovery from a primary on another node 从其他节点的主恢复---副本分片走此流程
SNAPSHOT:recovery from a snapshot 从快照恢复
LOCAL_SHARDS:recovery from other shards of another index on the same node 从本节点的另一个index的其他shard恢复

主分片主要从Translog中自我恢复,尚未执行flush到磁盘的分段可以从tanslog中重建

副本分片走peer,

Why peer?

1)如果主副分片同时开始恢复的话,还要选主?so直接就让副本分片等待。。等待主分片恢复后,跟主分片对比,所以才走peer?

2)只有peer才会跟primary对比?

如何判断主副?本地会记录?

org.elasticsearch.indices.cluster.IndicesClusterStateService

    private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
        assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;

        DiscoveryNode sourceNode = null;
        if (shardRouting.recoverySource().getType() == Type.PEER)  { //判断类型
            sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); //寻找源节点
            if (sourceNode == null) {
                logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
                return;
            }
        }

        try {
            final long primaryTerm = state.metaData().index(shardRouting.index()).primaryTerm(shardRouting.id());
            logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
            RecoveryState  recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); //保存恢复信息,即当前恢复阶段、主分片、分片ID、source节点、target节点
            indicesService.createShard(   //函数入口
                    shardRouting,   
                    recoveryState,  
                    recoveryTargetService, 
                    new RecoveryListener(shardRouting, primaryTerm),  
                           //恢复状态改变callback(finishes or fails)
                    repositoriesService, //service responsible for snapshot/restore
                    failedShardHandler,  //shard fails 的callback
                    globalCheckpointSyncer,  //shard同步全局checkpoint的callback
                    retentionLeaseSyncer);  //shard syce租约的callback
        } catch (Exception e) {
            failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
        }
    }

寻找源节点:

源节点的确定分两种情况,如果当前shard本身不是primary shard,则源节点为primary shard所在节点,否则,如果当前shard正在搬迁中(从其他节点搬迁到本节点),则源节点为数据搬迁的源头节点。得到源节点后调用IndicesService.createShard,在该方法中调用方法IndexShard.startRecovery开始恢复。

org.elasticsearch.indices.cluster.IndicesClusterStateService

    private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes,
                                                               ShardRouting shardRouting) {
        DiscoveryNode sourceNode = null;
        if (!shardRouting.primary()) { //如果shard本身不是primary shard???---->
            ShardRouting primary = routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard();
            // 只能从started状态的primary恢复,否则继续轮询,
            
            if (primary.active()) { //判断主分片的状态---主分片优先恢复,副本分片等待
                sourceNode = nodes.get(primary.currentNodeId()); // 找到primary shard所在节点
                if (sourceNode == null) {
                    logger.trace("can't find replica source node because primary shard {} is assigned to an unknown node.", primary);
                }
            } else {
                logger.trace("can't find replica source node because primary shard {} is not active.", primary);
            }
        } else if (shardRouting.relocatingNodeId() != null) { //如果正在搬迁
            sourceNode = nodes.get(shardRouting.relocatingNodeId());
            if (sourceNode == null) {
                logger.trace("can't find relocation source node for shard {} because it is assigned to an unknown node [{}].",
                    shardRouting.shardId(), shardRouting.relocatingNodeId()); // 找到搬迁的源节点
            }
        } else {
            throw new IllegalStateException("trying to find source node for peer recovery when routing state means no peer recovery: " +
                shardRouting);
        }
        return sourceNode;
    }

对于恢复类型为PEER的任务,恢复动作的真正执行者为PeerRecoveryTargetService

 final AllocatedIndices> indicesService;
 
 T createShard(
                ShardRouting shardRouting,
                RecoveryState recoveryState,
                PeerRecoveryTargetService recoveryTargetService,     //实际执行函数-------------------------
                PeerRecoveryTargetService.RecoveryListener recoveryListener, //listener
                RepositoriesService repositoriesService,
                Consumer onShardFailure,
                Consumer globalCheckpointSyncer,
                RetentionLeaseSyncer retentionLeaseSyncer) throws IOException;

PeerRecoveryTargetService.doRecovery: StartRecoveryRequest通过RPC发送到源节点:

org.elasticsearch.indices.recovery.PeerRecoveryTargetService

public void doRun() {
            doRecovery(recoveryId);
        }


private void doRecovery(final long recoveryId) {
        final StartRecoveryRequest request;
        final RecoveryState.Timer timer;
        CancellableThreads cancellableThreads;
        try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
            if (recoveryRef == null) {
                logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
                return;
            }
            final RecoveryTarget recoveryTarget = recoveryRef.target();
            timer = recoveryTarget.state().getTimer();
            cancellableThreads = recoveryTarget.cancellableThreads();
            try {
                assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
                logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
                recoveryTarget.indexShard().prepareForIndexRecovery();
                final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); //获取到startingseqno
                assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :
                    "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
                request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);   // 将metadataSnapshot等信息包装成request------------------------
            } catch (final Exception e) {
                // this will be logged as warning later on...
                logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
                onGoingRecoveries.failRecovery(recoveryId,
                    new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);
                return;
            }
        }
       //.....................此处省略若干行

        try {
            logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode());
            cancellableThreads.executeIO(() -> //向源节点发送请求,请求恢复----------------------------
            //在cancelableThreads后会继续执行,确保在网络或者其他情况导致传输延迟情况下中断任何阻塞调用,在moving异步执行后是不干净的,但是错过请求更难以排查且不可接受。
                transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
                    new TransportResponseHandler() {
                        @Override
                        public void handleResponse(RecoveryResponse recoveryResponse) {
                            final Timevalue recoveryTime = new Timevalue(timer.time());
                            // do this through ongoing recoveries to remove it from the collection
                            onGoingRecoveries.markRecoveryAsDone(recoveryId);
                            //..........此处省略若干行
                        }

                        @Override
                        public void handleException(TransportException e) {
                            handleException.accept(e);
                        }

                        @Override
                        public String executor() {
                            // we do some heavy work like refreshes in the response so fork off to the generic threadpool
                            return ThreadPool.Names.GENERIC;
                        }

                        @Override
                        public RecoveryResponse read(StreamInput in) throws IOException {
                            return new RecoveryResponse(in);
                        }
                    })
            );
        } catch (CancellableThreads.ExecutionCancelledException e) {
            logger.trace("recovery cancelled", e);
        } catch (Exception e) {
            handleException.accept(e);
        }
    }

IndexShard.java

Primary Terms: 由主节点分配给每个主分片,每次主分片发生变化时递增。主要作用是能够区别新旧两种主分片,只对最新的Terms进行操作。

Sequence Numbers: 标记发生在某个分片上的写操作。由主分片分配,只对写操作分配。假设索引test有两个主分片一个副本分片,当0号分片的序列号增加到5时,它的主分片离线,副本提升为新的主,对于后续的写操作,序列号从6开启递增。1号分片有自己独立的Sequence Numbers。

主分片在每次向副本转发写请求时,都会带上这两个值。

有了Primary Terms和Sequence Numbers,理论上好像就可以检测出分片之间的差异(从旧的主分片删除新的主分片操作历史中不存在的操作,并且将缺少的操作索引到旧主分片),但是当同时为每秒成百上千的事件做索引时,比较数百万个操作的历史是不切实际的,且耗费大量的存储成本,所以ES维护了一个GlobalCheckpoint的安全标记。

先来看下checkpoint的概念和作用:

GlobalCheckpoint: 全局检查点是所有活跃分片历史都已经对齐的序列号,即所有低于全局检查点的操作都保证已被所有活跃的分片处理完毕。这意味着,当主分片失效时,我们只需要比较新主分片和其他副本分片之间的最后一个全局检查点之后的操作即可。当就主分片恢复时,使用它知道的全局检查点,与新的主分片进行比较。这样,我们只需要进行小部分操作比较,而不是全部。

主分片负责推进全局检查点,它通过跟踪副本上完成的操作来实现。一旦检测到有副本分片已经超出给定序列号,它将相应的更新全局检查点。副本分片不会跟踪所有操作,而是维护一个本地检查点。

LocalCheckpoint: 本地检查点也是一个序列号,所有序列号低于它的操作都已在该分片上(写lucene和translog成功)处理完毕。

 public long recoverLocallyUpToGlobalCheckpoint() {
        assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
        if (state != IndexShardState.RECOVERING) {
            throw new IndexShardNotRecoveringException(shardId, state);
        }
        assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
        assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
        final Optional safeCommit;
        final long globalCheckpoint;
        try {
            final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);//获取最后提交到segment的translogUUID
            globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
          //获取translog的globalCheckpoint
            safeCommit = store.findSafeIndexCommit(globalCheckpoint);
          //找到safecommit-------->什么是安全的提交?
        } catch (org.apache.lucene.index.IndexNotFoundException e) {
            logger.trace("skip local recovery as no index commit found");
            return UNASSIGNED_SEQ_NO;
        } catch (Exception e) {
            logger.debug("skip local recovery as failed to find the safe commit", e);
            return UNASSIGNED_SEQ_NO;
        }
        try {
            maybeCheckIndex(); // 检查索引,重复的基于ops恢复操作发生
            recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
            if (safeCommit.isPresent() == false) {
                logger.trace("skip local recovery as no safe commit found");
                return UNASSIGNED_SEQ_NO;
            }
            assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint;
            if (safeCommit.get().localCheckpoint == globalCheckpoint) {
                logger.trace("skip local recovery as the safe commit is up to date; safe commit {} global checkpoint {}",
                    safeCommit.get(), globalCheckpoint);
                recoveryState.getTranslog().totalLocal(0);
                return globalCheckpoint + 1;
            }
            if (indexSettings.getIndexmetaData().getState() == IndexmetaData.State.CLOSE ||
                IndexmetaData.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) {
                logger.trace("skip local recovery as the index was closed or not allowed to write; safe commit {} global checkpoint {}",
                    safeCommit.get(), globalCheckpoint);
                recoveryState.getTranslog().totalLocal(0);
                return safeCommit.get().localCheckpoint + 1;
            }
            try {
                final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
                    recoveryState.getTranslog().totalLocal(snapshot.totalOperations());
                    final int recoveredOps = runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
                        recoveryState.getTranslog()::incrementRecoveredOperations);
                    recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count
                    return recoveredOps;
                };
                innerOpenEngineAndTranslog(() -> globalCheckpoint);
                getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
                logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
            } finally {
                synchronized (engineMutex) {
                    IOUtils.close(currentEngineReference.getAndSet(null));
                }
            }
        } catch (Exception e) {
            logger.debug(new ParameterizedMessage("failed to recover shard locally up to global checkpoint {}", globalCheckpoint), e);
            return UNASSIGNED_SEQ_NO;
        }
        try {
            // we need to find the safe commit again as we should have created a new one during the local recovery
            final Optional newSafeCommit = store.findSafeIndexCommit(globalCheckpoint);
            assert newSafeCommit.isPresent() : "no safe commit found after local recovery";
            return newSafeCommit.get().localCheckpoint + 1;
        } catch (Exception e) {
            logger.debug(new ParameterizedMessage(
                "failed to find the safe commit after recovering shard locally up to global checkpoint {}", globalCheckpoint), e);
            return UNASSIGNED_SEQ_NO;
        }
    }

安全提交—安全提交的所有操作最多只能是全局检查点----写入translog文件算安全提交,生成新的translog文件会更新checkpoint文件。

prepareCommit主要是将当前的translog设置为旧生代的,并创建最新的translog。具体做以下几件事:

​ 1.设置currentCommittingTranslog 为当前的translog,表明现在进行二阶段提交操作。并将当前trasnlog sync。下面的代码中 current 表示对当前translog进行写操作的对象, currentCommittingTranslog 表示为对当前translog只读的对象。

​ 2.更新checkpoint文件,本文中说的translog包含translog_N.tlog和translog_N.ckp文件。当前最新的translog中checkpoint文件名为 translog.ckp文件,而旧生代的translog的checkpoint文件名为 translog_N.ckp。所以当前translog变为旧生代时候,需要创建translog_N.ckp文件,并将translog.ckp的内容拷贝过去。然后将translog_N.ckp文件持久落盘,由于有新文件生成,还需要把目录也持久落盘。

​ 3.生成新的translog文件,并设置为当前的translog。新生成的translog文件名为 translog_N+1.log,最新的checkpoint文件名都是translog.ckp. 所以有新的translog文件生成后,将会重写translog.ckp文件。translog.ckp文件中记录了最新的translog文件的代数、操作数和偏移量。通过checkpoint文件可以很容易的找到最新的translog文件。

​ 4.和视图(View)有关,View用来主从同步translog,当replica shard向 primary shard同步数据时, primary shard 会生成一个View,即相当于对当前的translog生成一个快照。 View表示当前对应的translog,一般情况下对应最新的那代translog。如果正在进行二阶段提交操作,即处在prepareCommit 和 commit之间,则此期间的View需要包含上一代的translog (currentCommittingTranslog ) 和最新代的translog。

​ 5.IOUtils.close(oldCurrent),此处主要用于减少前一代文件的引用计数器。由于有一个写对象(oldCurrent) 和 一个读对象(currentCommittingTranslog) 都打开了前一代文件,所以前一代文件的引用计数器至少为2, 此处关闭写对象,引用计数器-1.

public Optional findSafeIndexCommit(long globalCheckpoint) throws IOException {
    final List commits = DirectoryReader.listCommits(directory);
    assert commits.isEmpty() == false : "no commit found";
    final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint);
    final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet());
    //安全提交的所有操作最多只能是全局检查点
    // all operations of the safe commit must be at most the global checkpoint.
    if (commitInfo.maxSeqNo <= globalCheckpoint) {
        return Optional.of(commitInfo);
    } else {
        return Optional.empty();
    }
}

获取shard的metadataSnapshot,该结构中包含shard的段信息,如syncid、checksum、doc数等,然后封装为 StartRecoveryRequest

org.elasticsearch.indices.recovery.PeerRecoveryTargetService

    public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, DiscoveryNode localNode,
                                                               RecoveryTarget recoveryTarget, long startingSeqNo) {
        final StartRecoveryRequest request;
        logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

        Store.metadataSnapshot metadataSnapshot;
        try {
            metadataSnapshot = recoveryTarget.indexShard().snapshotStoremetadata();
            //获取shard的metadataSnapshot,该结构中包含shard的段信息,如syncid、checksum、doc数等,确保当前translog和Lucene是一致的,否则不得不丢弃Lucene index ---------------------
            try {
                final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); //获取TranslogUUID
                final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);  //通过translog位置和期望的transloguuid 获取全局检查点
                assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; //判断是否全局检查点+1 大于 startingSeqNo
            } catch (IOException | TranslogCorruptedException e) {
                logger.warn(new ParameterizedMessage("error while reading global checkpoint from translog, " +
                    "resetting the starting sequence number from {} to unassigned and recovering as if there are none", startingSeqNo), e);
                metadataSnapshot = Store.metadataSnapshot.EMPTY;
                startingSeqNo = UNASSIGNED_SEQ_NO;
            }
        } catch (final org.apache.lucene.index.IndexNotFoundException e) {
            // happens on an empty folder. no need to log
            assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo;
            logger.trace("{} shard folder empty, recovering all files", recoveryTarget);
            metadataSnapshot = Store.metadataSnapshot.EMPTY;
        } catch (final IOException e) {
            if (startingSeqNo != UNASSIGNED_SEQ_NO) {
                logger.warn(new ParameterizedMessage("error while listing local files, resetting the starting sequence number from {} " +
                    "to unassigned and recovering as if there are none", startingSeqNo), e);
                startingSeqNo = UNASSIGNED_SEQ_NO;
            } else {
                logger.warn("error while listing local files, recovering as if there are none", e);
            }
            metadataSnapshot = Store.metadataSnapshot.EMPTY;
        }
        logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
        request = new StartRecoveryRequest( //创建recovery quest -------------------
            recoveryTarget.shardId(),
            recoveryTarget.indexShard().routingEntry().allocationId().getId(),
            recoveryTarget.sourceNode(),
            localNode,
            metadataSnapshot,
            recoveryTarget.state().getPrimary(),
            recoveryTarget.recoveryId(),
            startingSeqNo);
        return request;
    }

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5fVTcqdl-1642761789334)(/Users/changjing/Desktop/截屏2022-01-20 下午1.07.15.png)]

源节点接收到恢复请求的入口:源节点控制恢复,控制权转移

org.elasticsearch.indices.recovery.PeerRecoverySourceService

    class StartRecoveryTransportRequestHandler implements TransportRequestHandler {
        @Override
        public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
            recover(request, new ChannelActionListener<>(channel, Actions.START_RECOVERY, request)); //源节点接收到请求后会调用恢复的入口函数recover
        }
    }

recover方法根据request得到shard并构造RecoverySourceHandler对象,然后调用handler.recoverToTarget进入恢复的执行体:

org.elasticsearch.indices.recovery.PeerRecoverySourceService

    private void recover(StartRecoveryRequest request, ActionListener listener) {
        final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); //获取indexService
        final IndexShard shard = indexService.getShard(request.shardId().id()); //获取shardid

        final ShardRouting routingEntry = shard.routingEntry(); //启动routingEntry

       //判断routingEntry是否为主、routingEntry是否active,否抛错
        if (routingEntry.primary() == false || routingEntry.active() == false) {
            throw new DelayRecoveryException("source shard [" + routingEntry + "] is not an active primary");
        } 
       
        if (request.isPrimaryRelocation() && (routingEntry.relocating() == false ||
            routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) {
            logger.debug("delaying recovery of {} as source shard is not marked yet as relocating to {}",
                request.shardId(), request.targetNode());
            throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]");
        }
       
       //根据request得到shard并构造RecoverySourceHandler对象
        RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
        logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(),
            request.targetNode());
        handler.recoverToTarget(ActionListener.runAfter(listener, () -> ongoingRecoveries.remove(shard, handler)));  //恢复入口
    }

恢复流程:

org.elasticsearch.indices.recovery.RecoverySourceHandler

 public void recoverToTarget(ActionListener listener) {
        final Closeable releaseResources = () -> IOUtils.close(resources);
        final ActionListener wrappedListener = ActionListener.notifyOnce(listener);
        try {
            cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
                final RuntimeException e;
                if (shard.state() == IndexShardState.CLOSED) { //检查shard是否close
                    e = new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]");
                } else {
                    e = new CancellableThreads.ExecutionCancelledException("recovery was canceled reason [" + reason + "]");
                }
                if (beforeCancelEx != null) {
                    e.addSuppressed(beforeCancelEx);
                }
                IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
                throw e;
            });
            final Consumer onFailure = e -> {
                assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[onFailure]");
                IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
            };

            final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled();
            final SetOnce retentionLeaseRef = new SetOnce<>();
            ------------------------------开始----------------
            runUnderPrimaryPermit(() -> {  //获取RoutingTable,获取AllocationId,分配保留租约
                final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
                ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
                if (targetShardRouting == null) {
                    logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(),
                        request.targetNode());
                    throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
                }
                assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
                retentionLeaseRef.set(
                    shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));
            }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
                shard, cancellableThreads, logger);
            final Engine.HistorySource historySource;
            if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) {//软删除为true且租约不为空
                historySource = Engine.HistorySource.INDEX;
            } else {
                historySource = Engine.HistorySource.TRANSLOG;
            }
            final Closeable retentionLock = shard.acquireHistoryRetentionLock(historySource);
            resources.add(retentionLock);  //加保留锁,获取translog文件和lucene softDelete文件的锁,防止在操作时,文件被修改。
            final long startingSeqNo;
            //sequenceNumber为true条件
            final boolean isSequenceNumberbasedRecovery
                = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
                && isTargetSameHistory()
                && shard.hasCompleteHistoryOperations("peer-recovery", historySource, request.startingSeqNo())
                && (historySource == Engine.HistorySource.TRANSLOG ||
                   (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
             //即使有完整的租约,在计算isSequenceNumberbasedRecovery会检查是是否有完整的历史操作记录 ,因为从7.4或者更早版本进行滚动升级,创建了一些最初不满意的租约,在有些情况并未像我们希望的那样持有租约,软删除是很方便,但如果没有完整的历史,基于序列号恢复将是一场灾难

            if (isSequenceNumberbasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) {
                // 需要的历史记录已存在租约,不需要加单独的保留锁
                retentionLock.close();
                logger.trace("history is retained by {}", retentionLeaseRef.get());
            } else {
               //在调用shard.hasCompleteHistoryOperations()前所有的历史都需要加上保留,在需要使用 safe commit之前,这样保证在saft commit可以确认所有操作,在恢复操作进行时间段,本地checkpoint会保存
                logger.trace("history is retained by retention lock");
            }

            final StepListener sendFileStep = new StepListener<>();
            final StepListener prepareEngineStep = new StepListener<>();
            final StepListener sendSnapshotStep = new StepListener<>();
            final StepListener finalizeStep = new StepListener<>();

            //如果可以基于请求中的sequenceNumber进行恢复,则跳过phase1
            if (isSequenceNumberbasedRecovery) {
                logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
                startingSeqNo = request.startingSeqNo();
                if (retentionLeaseRef.get() == null) {
                    createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY));
                } else {
                    sendFileStep.onResponse(SendFileResult.EMPTY);
                }
            } else {
                final Engine.IndexCommitRef safeCommitRef;
                try {
                    safeCommitRef = shard.acquireSafeIndexCommit();
                    resources.add(safeCommitRef);
                } catch (final Exception e) {
                    throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
                }
    //尽量复制足够的操作到正在恢复的peer,以便如果要将其提升为主节点,可以基于operations恢复其他副本。如果我们不使用保留租约,那么会保守地复制所有可用的操作。如果我们使用保留租约,那么“足够的操作”只是从安全提交的本地检查点开始的操作,因为当使用软删除时,安全提交至少会保留与其他任何内容一样多的历史记录。因为当使用软删除时,安全提交至少会保留与history一样多的记录。安全提交通常会包含当前保留租约集保留的所有历史记录,但这并不能保证:从不同主节点的一个较早的peer恢复,如果该主节点已经丢弃了某些历史,创建了保留租约。因为全局checkpoint前进时我们丢弃了history,并且新的安全提交未创建。无论如何,这是尽最大努力,因为未来的恢复总是可以回退到基于文件的恢复,并且只有在稳定下来之前这个主节点fail时才会真正出现问题。 
                // -----即在数据恢复过程中,如果从节点没有存储未被写入文件的history。在主节点部分history已被全局checkpoint覆盖删除。主节点在恢复过程中突然fail,数据会存在丢失。
                startingSeqNo = softDeletesEnabled
                    ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L
                    : 0;
                logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);

                try {
                    final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo);
                    final Releasable releaseStore = acquireStore(shard.store());
                    resources.add(releaseStore);
                    sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
                        try {
                            IOUtils.close(safeCommitRef, releaseStore);
                        } catch (final IOException ex) {
                            logger.warn("releasing snapshot caused exception", ex);
                        }
                    });

                    final StepListener deleteRetentionLeaseStep = new StepListener<>();
                    runUnderPrimaryPermit(() -> {
                            try {
          // If the target previously had a copy of this shard then a file-based recovery might move its global
          // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
          // new one later on in the recovery.
                                shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),
                                    new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,
                                        deleteRetentionLeaseStep, false));
                            } catch (RetentionLeaseNotFoundException e) {
                                logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
                                deleteRetentionLeaseStep.onResponse(null);
                            }
                        }, shardId + " removing retention lease for [" + request.targetAllocationId() + "]",
                        shard, cancellableThreads, logger);

                    deleteRetentionLeaseStep.whenComplete(ignored -> {
                        assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
                        phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep); //第一阶段,比较syncid和segment,然后得出有差异的部分,主动将数据推送给请求方,完成后删除租约-------------------
                    }, onFailure);

                } catch (final Exception e) {
                    throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
                }
            }
            assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;

            sendFileStep.whenComplete(r -> {
                assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
                // For a sequence based recovery, the target can keep its local translog
                prepareTargetForTranslog(
                    shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo), prepareEngineStep);
            }, onFailure);

            prepareEngineStep.whenComplete(prepareEngineTime -> {
                assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]");
                
                runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
                    shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);

                final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
                logger.trace("snapshot translog for recovery; current size is [{}]",
                    shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo));
                final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);
                resources.add(phase2Snapshot);
                retentionLock.close();

                // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
                // are at least as high as the corresponding values on the primary when any of these operations were executed on it.
                final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
                final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
                final RetentionLeases retentionLeases = shard.getRetentionLeases();
                final long mappingVersionOnPrimary = shard.indexSettings().getIndexmetaData().getMappingVersion();
                //第二阶段,发送translog-------------------
                phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
                    retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);
                sendSnapshotStep.whenComplete(
                    r -> IOUtils.close(phase2Snapshot),
                    e -> {
                        IOUtils.closeWhileHandlingException(phase2Snapshot);
                        onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e));
                    });

            }, onFailure);

            // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2
            final long trimAboveSeqNo = startingSeqNo - 1;
            sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);

            finalizeStep.whenComplete(r -> {
                final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
                final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
                final SendFileResult sendFileResult = sendFileStep.result();
                final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
                    sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
                    sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
                    prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
                try {
                    wrappedListener.onResponse(response);
                } finally {
                    IOUtils.close(resources);
                }
            }, onFailure);
        } catch (Exception e) {
            IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
        }
    }

phase1:恢复segment文件

从上面代码可以看出,phase1的具体逻辑是,首先拿到待恢复shard的metadataSnapshot从而得到recoverySourceSyncId,根据request拿到recoveryTargetSyncId,比较两边的syncid,如果相同再比较源和目标的文档数,如果也相同,说明在当前提交点之前源和目标的shard对应的segments都相同,因此不用恢复segment文件。如果两边的syncid不同,说明segment文件有差异,则需要找出所有有差异的文件进行恢复。通过比较recoverySourcemetadata和recoveryTargetSnapshot的差异性,可以找出所有有差别的segment文件。这块逻辑如下:

 void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) {
        cancellableThreads.checkForCancel();
        final Store store = shard.store();
        try {
            StopWatch stopWatch = new StopWatch().start();
            final Store.metadataSnapshot recoverySourcemetadata;
            try {
                recoverySourcemetadata = store.getmetadata(snapshot);
            } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatToonewException ex) {
                shard.failShard("recovery", ex);
                throw ex;
            }
            for (String name : snapshot.getFileNames()) {  
                final StoreFilemetaData md = recoverySourcemetadata.get(name);
                if (md == null) {
                    logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourcemetadata.asMap());
                    throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " +
                            recoverySourcemetadata.asMap().size() + " files", name);
                }
            }
            if (canSkipPhase1(recoverySourcemetadata, request.metadataSnapshot()) == false) {//可以跳过phase1
                final List phase1FileNames = new ArrayList<>();
                final List phase1FileSizes = new ArrayList<>();
                final List phase1ExistingFileNames = new ArrayList<>();
                final List phase1ExistingFileSizes = new ArrayList<>();

                // Total size of segment files that are recovered
                long totalSizeInBytes = 0;
                // Total size of segment files that were able to be re-used
                long existingTotalSizeInBytes = 0;

                // Generate a "diff" of all the identical, different, and missing
                // segment files on the target node, using the existing files on
                // the source node
                final Store.RecoveryDiff diff = recoverySourcemetadata.recoveryDiff(request.metadataSnapshot());
                for (StoreFilemetaData md : diff.identical) {
                    phase1ExistingFileNames.add(md.name());
                    phase1ExistingFileSizes.add(md.length());
                    existingTotalSizeInBytes += md.length();
                    if (logger.isTraceEnabled()) {
                        logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," +
                                        " size [{}]", md.name(), md.checksum(), md.length());
                    }
                    totalSizeInBytes += md.length();
                }
                List phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
                phase1Files.addAll(diff.different);
                phase1Files.addAll(diff.missing);
                for (StoreFilemetaData md : phase1Files) {
                    if (request.metadataSnapshot().asMap().containsKey(md.name())) {
                        logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
                            md.name(), request.metadataSnapshot().asMap().get(md.name()), md);
                    } else {
                        logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name());
                    }
                    phase1FileNames.add(md.name());
                    phase1FileSizes.add(md.length());
                    totalSizeInBytes += md.length();
                }

                logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
                    phase1FileNames.size(), new ByteSizevalue(totalSizeInBytes),
                    phase1ExistingFileNames.size(), new ByteSizevalue(existingTotalSizeInBytes));
                final StepListener sendFileInfoStep = new StepListener<>();
                final StepListener sendFilesStep = new StepListener<>();
                final StepListener createRetentionLeaseStep = new StepListener<>();
                final StepListener cleanFilesStep = new StepListener<>();
                cancellableThreads.checkForCancel();
                recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
                        phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep);

                sendFileInfoStep.whenComplete(r ->
                    sendFiles(store, phase1Files.toArray(new StoreFilemetaData[0]), translogOps, sendFilesStep), listener::onFailure);

                sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);

                createRetentionLeaseStep.whenComplete(retentionLease ->
                    {
                        final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint();
                        assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint
                            : retentionLease + " vs " + lastKnownGlobalCheckpoint;
                        // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want
                        // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica
                        // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on
                        // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint.
                        cleanFiles(store, recoverySourcemetadata, translogOps, lastKnownGlobalCheckpoint, cleanFilesStep);
                    },
                    listener::onFailure);

                final long totalSize = totalSizeInBytes;
                final long existingTotalSize = existingTotalSizeInBytes;
                cleanFilesStep.whenComplete(r -> {
                    final Timevalue took = stopWatch.totalTime();
                    logger.trace("recovery [phase1]: took [{}]", took);
                    listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
                        phase1ExistingFileSizes, existingTotalSize, took));
                }, listener::onFailure);
            } else {
                logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourcemetadata.getSyncId());

                // but we must still create a retention lease
                final StepListener createRetentionLeaseStep = new StepListener<>();
                createRetentionLease(startingSeqNo, createRetentionLeaseStep);
                createRetentionLeaseStep.whenComplete(retentionLease -> {
                    final Timevalue took = stopWatch.totalTime();
                    logger.trace("recovery [phase1]: took [{}]", took);
                    listener.onResponse(new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(),
                        Collections.emptyList(), 0L, took));
                }, listener::onFailure);

            }
        } catch (Exception e) {
            throw new RecoverFilesRecoveryException(request.shardId(), 0, new ByteSizevalue(0L), e);
        }
    }

可以跳过phase1,源分片和目标分片的syncId一致

   boolean canSkipPhase1(Store.metadataSnapshot source, Store.metadataSnapshot target) {
        if (source.getSyncId() == null || source.getSyncId().equals(target.getSyncId()) == false) {
            return false;   //条件1:源的syncid和目标的syncid一致,且不为null
        }
        if (source.getNumDocs() != target.getNumDocs()) {//条件2:源的docnum和目标的docnum一致
            throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number " +
                "of docs differ: " + source.getNumDocs() + " (" + request.sourceNode().getName() + ", primary) vs " + target.getNumDocs()
                + "(" + request.targetNode().getName() + ")");
        }
        SequenceNumbers.CommitInfo sourceSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(source.getCommitUserData().entrySet());
        SequenceNumbers.CommitInfo targetSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(target.getCommitUserData().entrySet());
        if (sourceSeqNos.localCheckpoint != targetSeqNos.localCheckpoint || targetSeqNos.maxSeqNo != sourceSeqNos.maxSeqNo) {
    //条件3: 基于源seqno的本地chckpoint和基于目标seqno的本地checkpoint一致
            final String message = "try to recover " + request.shardId() + " with sync id but " +
                "seq_no stats are mismatched: [" + source.getCommitUserData() + "] vs [" + target.getCommitUserData() + "]";
            assert false : message;
            throw new IllegalStateException(message);
        }
        return true;  //可以跳过phase1,源分片和目标分片的syncId一致
    }
    这里将所有的segment file分为三类:identical(相同)、different(不同)、missing(target缺失)。然后将different和missing的segment files作为第一阶段需要恢复的文件发送到target node。发送完segment files后,源节点还会向目标节点发送消息以通知目标节点清理临时文件,然后也会发送消息通知目标节点打开引擎准备接收translog,这里需要注意的是,这两次网络通信都会调用 PlainTransportFuture.txGet() 方法阻塞等待 对端回复。至此,第一阶段的恢复逻辑完毕。

public RecoveryDiff recoveryDiff(metadataSnapshot recoveryTargetSnapshot) {
    final List identical = new ArrayList<>();  // 相同的file 
    final List different = new ArrayList<>();  // 不同的file
    final List missing = new ArrayList<>();   // 缺失的file
    final Map> perSegment = new HashMap<>();
    final List perCommitStoreFiles = new ArrayList<>();
    ... ...
    for (List segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
        identicalFiles.clear();
        boolean consistent = true;
        for (StoreFilemetaData meta : segmentFiles) {
            StoreFilemetaData storeFilemetaData = recoveryTargetSnapshot.get(meta.name());
            if (storeFilemetaData == null) {
                consistent = false;
                missing.add(meta); // 该segment在target node中不存在,则加入到missing
            } else if (storeFilemetaData.isSame(meta) == false) {
                consistent = false;
                different.add(meta); // 存在但不相同,则加入到different
            } else {
                identicalFiles.add(meta);  // 存在且相同
            }
        }
        if (consistent) {
            identical.addAll(identicalFiles);
        } else {
            // make sure all files are added - this can happen if only the deletes are different
            different.addAll(identicalFiles);
        }
    }
    RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical), Collections.unmodifiableList(different), Collections.unmodifiableList(missing));
    return recoveryDiff;
}

phase2:发送translog

第二阶段的逻辑比较简单,只需将translog view到当前时间之间的所有translog发送给源节点即可。

void phase2(
            final long startingSeqNo,
            final long endingSeqNo,
            final Translog.Snapshot snapshot,
            final long maxSeenAutoIdTimestamp,
            final long maxSeqNoOfUpdatesOrDeletes,
            final RetentionLeases retentionLeases,
            final long mappingVersion,
            final ActionListener listener) throws IOException {
        if (shard.state() == IndexShardState.CLOSED) {
            throw new IndexShardClosedException(request.shardId());
        }
        logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");

        final AtomicInteger skippedOps = new AtomicInteger();
        final AtomicInteger totalSentOps = new AtomicInteger();
        final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch.
        final CheckedSupplier, IOException> readNextBatch = () -> {
            // We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
            // Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible.
            synchronized (snapshot) {
                final List ops = lastBatchCount.get() > 0 ? new ArrayList<>(lastBatchCount.get()) : new ArrayList<>();
                long batchSizeInBytes = 0L;
                Translog.Operation operation;
                while ((operation = snapshot.next()) != null) {
                    if (shard.state() == IndexShardState.CLOSED) {
                        throw new IndexShardClosedException(request.shardId());
                    }
                    cancellableThreads.checkForCancel();
                    final long seqNo = operation.seqNo();
                    if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
                        skippedOps.incrementAndGet();
                        continue;
                    }
                    ops.add(operation);
                    batchSizeInBytes += operation.estimateSize();
                    totalSentOps.incrementAndGet();

                    // check if this request is past bytes threshold, and if so, send it off
                    if (batchSizeInBytes >= chunkSizeInBytes) {
                        break;
                    }
                }
                lastBatchCount.set(ops.size());
                return ops;
            }
        };

        final StopWatch stopWatch = new StopWatch().start();
        final ActionListener batchedListener = ActionListener.map(listener,
            targetLocalCheckpoint -> {
                assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
                    : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
                    snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
                stopWatch.stop();
                final Timevalue tookTime = stopWatch.totalTime();
                logger.trace("recovery [phase2]: took [{}]", tookTime);
                return new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime);
            }
        );

        sendBatch(
                readNextBatch,
                true,
                SequenceNumbers.UNASSIGNED_SEQ_NO,
                snapshot.totalOperations(),
                maxSeenAutoIdTimestamp,
                maxSeqNoOfUpdatesOrDeletes,
                retentionLeases,
                mappingVersion,
                batchedListener);
    }
目标节点开始恢复

接收segment

对应上一小节源节点恢复的第一阶段,源节点将所有有差异的segment发送给目标节点,目标节点接收到后会将segment文件落盘。segment files的写入函数为RecoveryTarget.writeFileChunk:

org.elasticsearch.indices.recovery

        public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
                final RecoveryTarget recoveryTarget = recoveryRef.target();
                final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
                if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
                    indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
                }

                RateLimiter rateLimiter = recoverySettings.rateLimiter();
                if (rateLimiter != null) {
                    long bytes = bytesSinceLastPause.addAndGet(request.content().length());
                    if (bytes > rateLimiter.getMinPauseCheckBytes()) {
                        // Time to pause
                        bytesSinceLastPause.addAndGet(-bytes);
                        long throttleTimeInNanos = rateLimiter.pause(bytes);
                        indexState.addTargetThrottling(throttleTimeInNanos);
                        recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
                    }
                }
                final ActionListener listener = new ChannelActionListener<>(channel, Actions.FILE_CHUNK, request);
                recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(),     //接收信息调用writeFileChunk函数
                    request.totalTranslogOps(), ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));  //准备接收translog
            }
        }
    }

public void writeFileChunk(StoreFilemetaData filemetaData, long position, BytesReference content,
                               boolean lastChunk, int totalTranslogOps, ActionListener listener) {
        try {
            state().getTranslog().totalOperations(totalTranslogOps);
            multiFileWriter.writeFileChunk(filemetaData, position, content, lastChunk);
            listener.onResponse(null);
        } catch (Exception e) {
            listener.onFailure(e);
        }
    }

public void writeFileChunk(StoreFilemetaData filemetaData, long position, BytesReference content, boolean lastChunk)
        throws IOException {
        assert Transports.assertNotTransportThread("multi_file_writer");
        final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(filemetaData.name(), name -> new FileChunkWriter());
        writer.writeChunk(new FileChunk(filemetaData, content, position, lastChunk));
    }

准备接收translog

    class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler {

        @Override
        public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
                final ActionListener listener = new ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request);
                recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps(),
                    ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
            }
        }
    }

重放translog

接收并重放translog

打开引擎后,便可以根据translog中的命令进行相应的回放动作,回放的逻辑和正常的写入、删除类似,这里需要根据translog还原出操作类型和操作数据,并根据操作数据构建相应的数据对象,然后再调用上一步打开的engine执行相应的操作,这块逻辑如下:

    class TranslogOperationsRequestHandler implements TransportRequestHandler {

        @Override
        public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel,
                                    Task task) throws IOException {
            try (RecoveryRef recoveryRef =
                     onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
                final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
                final RecoveryTarget recoveryTarget = recoveryRef.target();
                final ActionListener listener =
                    new ChannelActionListener<>(channel, Actions.TRANSLOG_OPS, request);
                final Consumer retryonMappingException = exception -> {
                    // in very rare cases a translog replay from primary is processed before a mapping update on this node
                    // which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
                    logger.debug("delaying recovery due to missing mapping changes", exception);
                    // we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
                    // canceled)
                    observer.waitForNextChange(new ClusterStateObserver.Listener() {
                        @Override
                        public void onNewClusterState(ClusterState state) {
                            try {
                                messageReceived(request, channel, task);
                            } catch (Exception e) {
                                listener.onFailure(e);
                            }
                        }

                        @Override
                        public void onClusterServiceClose() {
                            listener.onFailure(new ElasticsearchException(
                                "cluster service was closed while waiting for mapping updates"));
                        }

                        @Override
                        public void onTimeout(Timevalue timeout) {
                            // note that we do not use a timeout (see comment above)
                            listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " +
                                "(timeout [" + timeout + "])"));
                        }
                    });
                };
                final IndexmetaData indexmetaData = clusterService.state().metaData().index(request.shardId().getIndex());
                final long mappingVersionOnTarget = indexmetaData != null ? indexmetaData.getMappingVersion() : 0L;
                recoveryTarget.indexTranslogOperations(
                        request.operations(),
                        request.totalTranslogOps(),
                        request.maxSeenAutoIdTimestamponPrimary(),
                        request.maxSeqNoOfUpdatesOrDeletesonPrimary(),
                        request.retentionLeases(),
                        request.mappingVersionOnPrimary(),
                        ActionListener.wrap(
                                checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
                                e -> {
                                    // do not retry if the mapping on replica is at least as recent as the mapping
                                    // that the primary used to index the operations in the request.
                                    if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
                                        retryOnMappingException.accept(e);
                                    } else {
                                        listener.onFailure(e);
                                    }
                                })
                );
            }
        }
    }

恢复完成:

   class FinalizeRecoveryRequestHandler implements TransportRequestHandler {

        @Override
        public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
                final ActionListener listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request);
                recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(),
                    ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
            }
        }
    } 

public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, ActionListener listener) {
        ActionListener.completeWith(listener, () -> {
            indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
            // Persist the global checkpoint.
            indexShard.sync();
            indexShard.persistRetentionLeases();
            if (trimAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
 // 我们会删除所有高于 trimAboveSeqNo 的 translog 操作,因为在phase2从源节点收到了相同或更新的副本,这里并不严格要求滚动新的 translog 生成,因为我们不会修改当前生成。这只是为了满足当前一代没有任何需要被修改的操作(参见 TranslogWriter#assertNoSeqAbove),这个假设不适用于对等恢复,因为我们可以从primary term接收到高于 startingSeqNo 的操作。
 // We should erase all translog operations above trimAboveSeqNo as we have received either the same or a newer copy from the recovery source in phase2. Rolling a new translog generation is not strictly required here for we won't trim the current generation. It's merely to satisfy the assumption that the current generation does not have any operation that would be trimmed (see TranslogWriter#assertNoSeqAbove). This assumption does not hold for peer recovery because we could have received operations above startingSeqNo from the previous primary terms.
                indexShard.rollTranslogGeneration();
                //在我们滚动一个新的 translog 后,可以达到刷新或 translog 生成阈值
                // the flush or translog generation threshold can be reached after we roll a new translog
                indexShard.afterWriteOperation();
                indexShard.trimOperationOfPreviousPrimaryTerms(trimAboveSeqNo);
            }
            if (hasUncommittedOperations()) {
                indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
            }
            indexShard.finalizeRecovery();
            return null;
        });
    }

shard recovery的所有流程都已完成。

由于phase1阶段完成后,从分片便可正常处理写入操作,而此时从分片的写入和phase2阶段的translog回放时并行执行的,如果translog的回放慢于正常的写入操作,那么可能会导致老的数据后写入,造成数据不一致。ES为了保证数据的一致性在进行写入操作时,会比较当前写入的版本和lucene文档版本号,如果当前版本更小,说明是旧数据则不会将文档写入lucene。相关代码如下:

final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocbasedOnVersions(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
    plan = IndexingStrategy.skipAsStale(false, index.version());
} 

文章:

https://cloud.tencent.com/developer/article/1370385

https://blog.csdn.net/lisi1129/article/details/111547995

https://mp.weixin.qq.com/s/9F1hwUFI690a8JaTmWWU3g

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711458.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号