栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka 3.0 日志定时清理(源码)

kafka 3.0 日志定时清理(源码)

文章目录

1、定时任务入口2、LopManager(这个是日志抽象层,实际逻辑不在这里)

(1) 把日志清理加入定时任务中 3、清理符合条件的日志

(1)deletableSegments(把需要删除的segment加入待删除的集合)(2) deleteSegments(对待删除的segment集合删除)

1、定时任务入口

这里选择kraft的模式启动的定时任务,所以入口是在BrokerServer.scala文件中,如果选择ZooKeeper模式的入口在KafkaServer.scala

 def startup(): Unit = {
    if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
    try {
      info("Starting broker")

      ,
      kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
      kafkaScheduler.startup()
	  //省略。。。
      //创建日志管理器,但不要启动它,因为我们需要延迟任何潜在的不干净关闭日志恢复
      // 直到我们赶上元数据日志并拥有最新的主题和代理配置。
      logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time,
        brokerTopicStats, logDirFailureChannel, keepPartitionmetadataFile = true)
		//省略。。。
    } catch {
      case e: Throwable =>
        maybeChangeStatus(STARTING, STARTED)
        fatal("Fatal error during broker startup. Prepare to shutdown", e)
        shutdown()
        throw e
    }
  }
2、LopManager(这个是日志抽象层,实际逻辑不在这里)
object LogManager {
  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
  val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000

  def apply(config: KafkaConfig,
            initialOfflineDirs: Seq[String],
            configRepository: ConfigRepository,
            kafkaScheduler: KafkaScheduler,
            time: Time,
            brokerTopicStats: BrokerTopicStats,
            logDirFailureChannel: LogDirFailureChannel,
            keepPartitionmetadataFile: Boolean): LogManager = {
    val defaultProps = LogConfig.extractLogConfigMap(config)

    LogConfig.validatevalues(defaultProps)
    val defaultLogConfig = LogConfig(defaultProps)

    val cleanerConfig = LogCleaner.cleanerConfig(config)

    new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
      initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
      configRepository = configRepository,
      initialDefaultConfig = defaultLogConfig,
      cleanerConfig = cleanerConfig,
      recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir,
      flushCheckMs = config.logFlushSchedulerIntervalMs,
      flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
      flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
      //log.retention.check.interval.ms 日志保留检查间隔 ms
      retentionCheckMs = config.logCleanupIntervalMs,
      maxPidExpirationMs = config.transactionalIdExpirationMs,
      scheduler = kafkaScheduler,
      brokerTopicStats = brokerTopicStats,
      logDirFailureChannel = logDirFailureChannel,
      time = time,
      keepPartitionmetadataFile = keepPartitionmetadataFile,
      interBrokerProtocolVersion = config.interBrokerProtocolVersion)
  }

}

上面的new LopManager 是下面的

(1) 把日志清理加入定时任务中
@threadsafe
class LogManager(logDirs: Seq[File],
                 initialOfflineDirs: Seq[File],
                 configRepository: ConfigRepository,
                 val initialDefaultConfig: LogConfig,
                 val cleanerConfig: CleanerConfig,
                 recoveryThreadsPerDataDir: Int,
                 val flushCheckMs: Long,
                 val flushRecoveryOffsetCheckpointMs: Long,
                 val flushStartOffsetCheckpointMs: Long,
                 val retentionCheckMs: Long,
                 val maxPidExpirationMs: Int,
                 interBrokerProtocolVersion: ApiVersion,
                 scheduler: Scheduler,
                 brokerTopicStats: BrokerTopicStats,
                 logDirFailureChannel: LogDirFailureChannel,
                 time: Time,
                 val keepPartitionmetadataFile: Boolean) extends Logging with KafkaMetricsGroup {

  import LogManager._
//省略。。。。

  
  def startup(topicNames: Set[String]): Unit = {
    // ensure consistency between default config and overrides
    val defaultConfig = currentDefaultConfig
    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
  }
  private[log] def startupWithConfigOverrides(defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig]): Unit = {
    loadLogs(defaultConfig, topicConfigOverrides) // this could take a while if shutdown was not clean

    
    if (scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention",
      					//cleanupLogs 是清理日志的入口
                         cleanupLogs _,
                         delay = InitialTaskDelayMs,
                         period = retentionCheckMs,
                         TimeUnit.MILLISECONDS)
     //省略。。。。。
    }
    if (cleanerConfig.enableCleaner) {
      _cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
      _cleaner.startup()
    }
  }
 
}

3、清理符合条件的日志
 
  def cleanupLogs(): Unit = {
    debug("Beginning log cleanup...")
    var total = 0
    val startMs = time.milliseconds
	//省略。。。。  
    try {
      deletableLogs.foreach {
        //topicPartition代表主题的分区,log是此topic,此分区的日志统计
        case (topicPartition, log) =>
          debug(s"Garbage collecting '${log.name}'")
          total += log.deleteOldSegments()

          val futureLog = futureLogs.get(topicPartition)
          if (futureLog != null) {
            // clean future logs 清理未来的日志
            debug(s"Garbage collecting future log '${futureLog.name}'")
            total += futureLog.deleteOldSegments()
          }
      }
    } finally {
      if (cleaner != null) {
        cleaner.resumeCleaning(deletableLogs.map(_._1))
      }
    }

    debug(s"Log cleanup completed. $total files deleted in " +
                  (time.milliseconds - startMs) / 1000 + " seconds")
  }

上面log.deleteOldSegments() 调用的是Log.scala的deleteOldSegments

  def deleteOldSegments(): Int = {
    if (config.delete) {
      deleteLogStartOffsetBreachedSegments() +
      //删除超过规定大小的
        deleteRetentionSizeBreachedSegments() +
      //删除超过保存时间的
        deleteRetentionMsBreachedSegments()
    } else {
      deleteLogStartOffsetBreachedSegments()
    }
  }
  private def deleteRetentionMsBreachedSegments(): Int = {
    //retention.ms 小于0代表不设置保存时间
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds

    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      startMs - segment.largestTimestamp > config.retentionMs
    }

    deleteOldSegments(shouldDelete, RetentionMsBreach)
  }

  private def deleteRetentionSizeBreachedSegments(): Int = {
    //retention.bytes 小于0代表不删,或者当前topic的此分区的日志大小小于设置的值,也不用删
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    var diff = size - config.retentionSize
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      if (diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }

    deleteOldSegments(shouldDelete, RetentionSizeBreach)
  }
 
  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean,
                                reason: SegmentDeletionReason): Int = {
    lock synchronized {
      //deletable,可以任务是可删除的segment集合
      val deletable = deletableSegments(predicate)
      if (deletable.nonEmpty)
        //这里实际执行删除操作
        deleteSegments(deletable, reason)
      else
        0
    }
  }

看上面的代码就知道,如果你配置了按最大超时时间和日志最大存储大小,那两个定时清理都会被执行

(1)deletableSegments(把需要删除的segment加入待删除的集合)
  
  private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
    if (segments.isEmpty) {
      Seq.empty
    } else {
      val deletable = ArrayBuffer.empty[LogSegment]
      val segmentsIterator = segments.values.iterator
      var segmentOpt = nextOption(segmentsIterator)
      while (segmentOpt.isDefined) {
        val segment = segmentOpt.get
        val nextSegmentOpt = nextOption(segmentsIterator)
        //upperBoundOffset 当前Segment最大偏移量, isLastSegmentAndEmpty 是否是最后一个Segment
        val (upperBoundOffset: Long, isLastSegmentAndEmpty: Boolean) =
          nextSegmentOpt.map {
            nextSegment => (nextSegment.baseOffset, false)
          }.getOrElse {
            (logEndOffset, segment.size == 0)
          }
        //如果本segment最大偏移量 <= highWatermark 且在偏函数中返回true 且不是最后一个Segment, 则加入可删除队列中
        if (highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) && !isLastSegmentAndEmpty) {
          deletable += segment
          segmentOpt = nextSegmentOpt
        } else {
          segmentOpt = Option.empty
        }
      }
      deletable
    }
  }

这里需要注意,在segment加入待删除集合之前,必须有isLastSegmentAndEmpty的判断,代表的是如果此segment是最后一个(出现的条件就是此segment的大小长度还没超过segment.bytes,默认是1G),所以此segment就不会加入待删除的集合
注意: 通过上面的讲解的源码和说明,如果你每个分区的最后一个segment不超过segment.bytes ,那不会被日志清理清理掉

下面的每一个segment有什么,可以看Kafka的Log存储原理再析,下面是我复制过来的一张图

主要是index,log,timeindexs三种文件,

而index和log文件的映射是下面这种

(2) deleteSegments(对待删除的segment集合删除)
private def deleteSegments(deletable: Iterable[LogSegment], reason: SegmentDeletionReason): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        //我们必须始终至少有一个段,所以如果我们要删除所有段,请先创建一个新段
        if (numberOfSegments == numToDelete)
          roll()
        lock synchronized {
          checkIfMemoryMappedBufferClosed()
          // remove the segments for lookups 删除用于查找的段
          removeAndDeleteSegments(deletable, asyncDelete = true, reason)
          maybeIncrementLogStartOffset(segments.firstSegment.get.baseOffset, SegmentDeletion)
        }
      }
      numToDelete
    }
  }

  def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
    maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
      val start = time.hiResClockMs()
      lock synchronized {
        checkIfMemoryMappedBufferClosed()
        //预计下一个偏移量,expectedNextOffset是none则是0,logEndOffset则是最后的offset
        val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)

        val logFile = Log.logFile(dir, newOffset)

        if (segments.contains(newOffset)) { //说明segment的文件已经生成
          // segment with the same base offset already exists and loaded 具有相同基本偏移量的段已存在并已加载
          if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {//如果活跃的segment的startOffset等于newOffset并且活跃的segment的size=0
            // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
            // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0).
            //我们已经看到在 shouldRoll() 为大小为零的活动段返回 true 之后发生这种情况(参见 KAFKA-6388),因为其中一个索引是“满的”(由于 _maxEntries == 0)。
            warn(s"Trying to roll a new log segment with start offset $newOffset " +
                 s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
                 s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
                 s" size of offset index: ${activeSegment.offsetIndex.entries}.")
            //删除activeSegment
            removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true, LogRoll)
          } else {
            throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
                                     s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
                                     s"segment is ${segments.get(newOffset)}.")
          }
        } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {//segments存在并且低于活跃segment的startOffset
          throw new KafkaException(
            s"Trying to roll a new log segment for topic partition $topicPartition with " +
            s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
        } else {
          val offsetIdxFile = offsetIndexFile(dir, newOffset)
          val timeIdxFile = timeIndexFile(dir, newOffset)
          val txnIdxFile = transactionIndexFile(dir, newOffset)

          for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
            warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
            Files.delete(file.toPath)
          }

          segments.lastSegment.foreach(_.onBecomeInactiveSegment())
        }

        // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
        // offset align with the new segment offset since this ensures we can recover the segment by beginning
        // with the corresponding snapshot file and scanning the segment data. Because the segment base offset
        // may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
        // we manually override the state offset here prior to taking the snapshot.
        //拍摄生产者状态的快照以方便恢复。让快照
        // 偏移量与新的段偏移量对齐很有用,因为这样可以确保我们可以通过
        // 从相应的快照文件开始并扫描段数据来恢复段。因为段基偏移量
        // 实际上可能在当前生产者状态结束偏移量之前(对应于日志结束偏移量),
        // 我们在拍摄快照之前手动覆盖了这里的状态偏移量。
        producerStateManager.updateMapEndOffset(newOffset)
        producerStateManager.takeSnapshot()

        val segment = LogSegment.open(dir,
          baseOffset = newOffset,
          config,
          time = time,
          //segment.bytes 规定segment的大小
          initFileSize = config.initFileSize,
          preallocate = config.preallocate)
        //添加新的segment
        addSegment(segment)

        // We need to update the segment base offset and append position data of the metadata when log rolls.
        // The next offset should not change.
        //我们需要在日志滚动时更新段基偏移量并附加元数据的位置数据。  下一个偏移量不应该改变
        updateLogEndOffset(nextOffsetmetadata.messageOffset)

        // schedule an asynchronous flush of the old segment 安排旧段的异步刷新
        scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)

        info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")

        segment
      }
    }
  }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775766.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号