Kafka核心源码解析 - LogManager源码解析

 2023-09-11 阅读 26 评论 0

摘要:LogManager主要负责Kafka的Log相关管理 1.LogManager初始化与启动 var logManager: LogManager = null ... //这里省略部分代码逻辑 // logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDir

LogManager主要负责Kafka的Log相关管理

1.LogManager初始化与启动

var logManager: LogManager = null
...
//这里省略部分代码逻辑
//
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()

1.1.LogManager初始化

object LogManager {val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000def apply(config: KafkaConfig,initialOfflineDirs: Seq[String],zkClient: KafkaZkClient,brokerState: BrokerState,kafkaScheduler: KafkaScheduler,time: Time,brokerTopicStats: BrokerTopicStats,logDirFailureChannel: LogDirFailureChannel): LogManager = {val defaultProps = KafkaServer.copyKafkaConfigToLog(config)val defaultLogConfig = LogConfig(defaultProps)// read the log configurations from zookeeperval (topicConfigs, failed) = zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)if (!failed.isEmpty) throw failed.head._2val cleanerConfig = LogCleaner.cleanerConfig(config)new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),topicConfigs = topicConfigs,initialDefaultConfig = defaultLogConfig,cleanerConfig = cleanerConfig,recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir,flushCheckMs = config.logFlushSchedulerIntervalMs,flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,retentionCheckMs = config.logCleanupIntervalMs,maxPidExpirationMs = config.transactionIdExpirationMs,scheduler = kafkaScheduler,brokerState = brokerState,brokerTopicStats = brokerTopicStats,logDirFailureChannel = logDirFailureChannel,time = time)}
}/*** The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.* All read and write operations are delegated to the individual log instances.** The log manager maintains logs in one or more directories. New logs are created in the data directory* with the fewest logs. No attempt is made to move partitions after the fact or balance based on* size or I/O rate.** A background thread handles log retention by periodically truncating excess log segments.*/
@threadsafe
class LogManager(logDirs: Seq[File],initialOfflineDirs: Seq[File],val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creationval initialDefaultConfig: LogConfig,val cleanerConfig: CleanerConfig,recoveryThreadsPerDataDir: Int,val flushCheckMs: Long,val flushRecoveryOffsetCheckpointMs: Long,val flushStartOffsetCheckpointMs: Long,val retentionCheckMs: Long,val maxPidExpirationMs: Int,scheduler: Scheduler,val brokerState: BrokerState,brokerTopicStats: BrokerTopicStats,logDirFailureChannel: LogDirFailureChannel,time: Time) extends Logging with KafkaMetricsGroup {import LogManager._val LockFile = ".lock"val InitialTaskDelayMs = 30 * 1000private val logCreationOrDeletionLock = new Objectprivate val currentLogs = new Pool[TopicPartition, Log]()// Future logs are put in the directory with "-future" suffix. Future log is created when user wants to move replica// from one log directory to another log directory on the same broker. The directory of the future log will be renamed// to replace the current log of the partition after the future log catches up with the current logprivate val futureLogs = new Pool[TopicPartition, Log]()// Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion.private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)@volatile private var _currentDefaultConfig = initialDefaultConfig@volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDirdef reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {this._currentDefaultConfig = logConfig}def currentDefaultConfig: LogConfig = _currentDefaultConfigdef liveLogDirs: Seq[File] = {if (_liveLogDirs.size == logDirs.size)logDirselse_liveLogDirs.asScala.toBuffer}private val dirLocks = lockLogDirs(liveLogDirs)@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>(dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap@volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>(dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMapprivate val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()private def offlineLogDirs: Iterable[File] = {val logDirsSet = mutable.Set[File](logDirs: _*)_liveLogDirs.asScala.foreach(logDirsSet -=)logDirsSet}loadLogs()// public, so we can access this from kafka.admin.DeleteTopicTestval cleaner: LogCleaner =if(cleanerConfig.enableCleaner)new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)elsenullval offlineLogDirectoryCount = newGauge("OfflineLogDirectoryCount",new Gauge[Int] {def value = offlineLogDirs.size})for (dir <- logDirs) {newGauge("LogDirectoryOffline",new Gauge[Int] {def value = if (_liveLogDirs.contains(dir)) 0 else 1},Map("logDirectory" -> dir.getAbsolutePath))}//省略其他方法定义}

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/5/48069.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息