柚子快報(bào)激活碼778899分享:分布式 kafka日志存儲(chǔ)
柚子快報(bào)激活碼778899分享:分布式 kafka日志存儲(chǔ)
前言
kafka的主題(topic)可以對(duì)應(yīng)多個(gè)分區(qū)(partition),而每個(gè)分區(qū)(partition)可以有多個(gè)副本(replica),我們提生產(chǎn)工單創(chuàng)建topic的時(shí)候也是要預(yù)設(shè)這些參數(shù)的。但是它究竟是如何存儲(chǔ)的呢?
我們?cè)谑褂胟afka發(fā)送消息時(shí),實(shí)際表現(xiàn)是提交日志,日志記錄會(huì)一個(gè)接一個(gè)地追加到日志的末尾,同時(shí)為了避免單一日志文件過大無線膨脹,kafka采用了日志分段(LogSegment)的形式進(jìn)行存儲(chǔ)。所謂日志分段,就是當(dāng)一個(gè)日志文件大小到達(dá)一定條件之后,就新建一個(gè)新的日志分段,然后在新的日志分段寫入數(shù)據(jù)。每個(gè)日志段對(duì)象會(huì)在磁盤上創(chuàng)建一組文件,包括消息日志文件(.log)、位移索引文件(.index)、時(shí)間戳索引文件 (.timeindex),也就是說日志段才是kafka真正的日志文件存儲(chǔ)基礎(chǔ)單元。
整個(gè)主題、分區(qū)、副本、日志關(guān)系如下:
以__consumer_offsets這個(gè)topic為例,每一個(gè)目錄對(duì)應(yīng)一個(gè)分區(qū),說明dev環(huán)境下這個(gè)topic有50個(gè)分區(qū),每個(gè)子目錄下存在多組日志段,也就是多組.log、.index、.timeindex 文件組合。
進(jìn)入/tmp/kafka-logs/__consumer_offset-49目錄,下圖中文件名的一串?dāng)?shù)字0是該日志段的起始位移值(Base Offset),也就是該日志段中所存的第一條消息的位移值,由此也可以推測(cè)出0000000000000000000.log中共有20條日志記錄
配置
前面是從生產(chǎn)配置和kafka目錄的文件直觀看到消息相關(guān)的內(nèi)容,下面引入幾個(gè)kafka消息相關(guān)的配置。
日志清理策略
kafka log的清理策略有兩種:delete,compact,默認(rèn)是delete
delete:一般是使用按照時(shí)間保留的策略,當(dāng)不活躍的segment的時(shí)間戳是大于設(shè)置的時(shí)間的時(shí)候,當(dāng)前segment就會(huì)被刪除compact: 日志不會(huì)被刪除,會(huì)被去重清理,這種模式要求每個(gè)record都必須有key,然后kafka會(huì)按照一定的時(shí)機(jī)清理segment中的key,對(duì)于同一個(gè)key只保留最新的那個(gè)key。同樣的,compact也只針對(duì)不活躍的segment 對(duì)應(yīng)的配置是log.cleanup.policy: delete,對(duì)應(yīng)topic級(jí)別的配置是cleanup.policy
消息保存時(shí)長(zhǎng)
Kafka 支持服務(wù)器級(jí)保留策略,我們可以通過配置三個(gè)基于時(shí)間的配置屬性之一來調(diào)整該策略:
log.retention.hours log.retention.minutes log.retention.ms 其默認(rèn)配置是log.retention.hours=168,即默認(rèn)保留7天, Kafka自身會(huì)用較高精度值覆蓋較低精度值。因此,如果在配置中新增log.retention.minutes=10,消息的保留時(shí)間將會(huì)變更位10分鐘 上面這個(gè)配置是服務(wù)器級(jí)別的,配置在server.properties中,每次新增創(chuàng)建topic時(shí),如果不指定topic的日志保留時(shí)間,以上述配置為例,消息的保留時(shí)長(zhǎng)就是7天,如果配置retention.ms=600000,這是從log.retention.minutes派生而來的,這個(gè)參數(shù)是topic級(jí)別的,配置了這個(gè)值,就會(huì)以這個(gè)值為準(zhǔn),創(chuàng)建topic之后,仍然可以單獨(dú)調(diào)整retention.ms,來調(diào)整topic的保留時(shí)間
segment相關(guān)配置
segment有兩個(gè)很重要的配置
log.segment.bytes log.roll.hours 目前風(fēng)控kafka上述兩個(gè)配置都是默認(rèn)值,見下圖,log.segment.bytes是1G,log.roll.hours是7d,這兩個(gè)配置在后續(xù)分析segment相關(guān)原理時(shí)還會(huì)再詳細(xì)介紹 segment掃描頻率的配置,日志片段文件檢查的周期時(shí)間,目前生產(chǎn)配置為5min
log.retention.check.interval.ms
原理
為什么要看源碼?因?yàn)闄z索了kafka的官方文檔,關(guān)于segment的單獨(dú)說明極少,網(wǎng)上檢索到的資料又沒有足夠的說服力,所以最終決定還是從源碼中尋找相關(guān)問題的答案
kafka源碼搭建 當(dāng)前風(fēng)控系統(tǒng)使用的kafka是1.1版本,隨機(jī)下載了相關(guān)源碼,由于kafka是用scala編寫的,并用gradle進(jìn)行打包處理,也進(jìn)行了相關(guān)程序的下載,對(duì)應(yīng)的版本見下述列出,感興趣的同事可以按如下版本下載,解壓編譯kafka源碼,然后就可以在idea中查看了
kafka源碼版本:1.1scala版本:2.12.7gradle版本:4.7
LogConfig
該scala定義了Defaults object,scala中的Object可以看成java中的util類,存放了很多常量
log.segment.bytes: 1GBlog.roll.ms: 168hourlog.retention.bytes: -1log.retention.ms: 168hour
LogSegment
正如前文中介紹的,segment才是kafka存儲(chǔ)單元的基礎(chǔ)部分,隨之找到了相關(guān)類LogSegment.scala
變量聲明
它的參數(shù)定義如下,可以查看它的注釋,明確的支出了segment是由log和index組成的,這也與我們前面查看kafka目錄中對(duì)應(yīng)的日志文件呼應(yīng)了
/**
* A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
* the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
* segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
* any previous segment.
*
* A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
*
* @param log The message set containing log entries
* @param offsetIndex The offset index
* @param timeIndex The timestamp index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param time The time instance
*/
@nonthreadsafe
@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
val offsetIndex: OffsetIndex,
val timeIndex: TimeIndex,
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val maxSegmentMs: Long,
val maxSegmentBytes: Int,
val time: Time) extends Logging {
.......
}
針對(duì)segment的成員變量,重點(diǎn)看以下幾個(gè)
baseOffset:消息偏移量,即文件名,對(duì)于一組sgement,它都是固定的,它就是該日志段中第一條消息的位移值,一共20位,不足的話前面補(bǔ)0,每個(gè)日志段對(duì)象保存自己的起始位移 baseOffset,這是非常重要的屬性,在源碼中經(jīng)??吹剿氖褂?!事實(shí)上,你在磁盤上看到的文件名就是 baseOffset 的值。每個(gè) LogSegment 對(duì)象實(shí)例一旦被創(chuàng)建,它的起始位移就是固定的了,不能再被更改。maxSegmentBytes:每段最大字節(jié)數(shù),該參數(shù)越大,日志被切成的segment就越少,控制粒度也就變小了,通過代碼debug發(fā)現(xiàn)這個(gè)參數(shù)取決于配置【log.segment.bytes】,目前風(fēng)控系統(tǒng)中配置為1GmaxSegmentMs:每段保留有效毫秒數(shù),每個(gè)segment在寫入一段時(shí)間的日志后,即使log還沒有達(dá)到maxSegmentBytes最大值,kafka也會(huì)強(qiáng)制日志滾動(dòng),以確保可以刪除或者壓縮舊數(shù)據(jù),該參數(shù)取決于【log.roll.ms】或【log.roll.hours】,當(dāng)前者不存在時(shí),取后者,后者目前在生產(chǎn)環(huán)境中配置為168hrollJitterMs:是日志段對(duì)象新增倒計(jì)時(shí)的“擾動(dòng)值”。因?yàn)槟壳?Broker 端日志段新增倒計(jì)時(shí)是全局設(shè)置,這就是說,在未來的某個(gè)時(shí)刻可能同時(shí)創(chuàng)建多個(gè)日志段對(duì)象,這將極大地增加物理磁盤 I/O 壓力。有了 rollJitterMs 值的干擾,每個(gè)新增日志段在創(chuàng)建時(shí)會(huì)彼此岔開一小段時(shí)間,這樣可以緩解物理磁盤的 I/O 負(fù)載瓶頸。這個(gè)變量給我的感覺有點(diǎn)像設(shè)置緩存時(shí)間加的隨機(jī)值,避免緩存同時(shí)過期。
shouldRoll方法
segment是否應(yīng)該進(jìn)行切分(roll)
def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
size > maxSegmentBytes - messagesSize ||
(size > 0 && reachedRollMs) ||
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages)
}
timeWaitedForRoll(now, maxTimestampInMessages):1. 如果此segment的第一個(gè)消息的時(shí)間戳存在,就用當(dāng)前的新的batch的時(shí)間戳,減去此segment第一條消息的的時(shí)間戳判斷是否已經(jīng)超過segments.ms,2. 如果此segments的第一個(gè)消息的時(shí)間戳不存在,就用當(dāng)前系統(tǒng)時(shí)間與此segment創(chuàng)建的時(shí)間差做判斷。reachedRollMs就表示,是否超過上述日志寫入事件差值是否超過【log.roll.hours】size > maxSegmentBytes - messagesSize:當(dāng)前 activeSegment 在追加本次消息之后,長(zhǎng)度超過 LogSegment 允許的最大值【log.segment.bytes】offsetIndex.isFull || timeIndex.isFull:索引文件是否滿了!canConvertToRelativeOffset(maxOffsetInMessages):這個(gè)變量涉及到offset的相對(duì)位移概念,后面再介紹
append方法
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
}
我這次分享的重點(diǎn)都是跟segment的寫入和刪除相關(guān)的,所以只重點(diǎn)介紹與之相關(guān)的內(nèi)容,下面的源碼解析也是這個(gè)思路
在append方法中,即往segment寫入消息時(shí),也會(huì)同步更新segment的最大時(shí)間戳以及最大時(shí)間戳所屬消息的位移值屬性。每個(gè)日志段都要保存當(dāng)前最大時(shí)間戳信息和所屬消息的位移信息。在 Broker 端的提供定期刪除日志功能中,比如我只想保留最近 7 天的日志,此處的當(dāng)前最大時(shí)間戳這個(gè)值就是判斷的依據(jù);
Log
Log 對(duì)象是 Kafka 源碼(特別是 Broker 端)最核心的部分,沒有之一。
日志是日志段的容器,里面定義了很多管理日志段的操作。
object Log
“.deleted”
.deleted 是刪除日志段操作創(chuàng)建的文件。目前刪除日志段文件是異步操作,Broker 端把日志段文件從.log 后綴修改為.deleted 后綴。如果你看到一大堆.deleted 后綴的文件名,別慌,這是 Kafka 在執(zhí)行日志段文件刪除。
filenamePrefixFromOffset
def filenamePrefixFromOffset(offset: Long): String = {
val nf = NumberFormat.getInstance()
nf.setMinimumIntegerDigits(20)
nf.setMaximumFractionDigits(0)
nf.setGroupingUsed(false)
nf.format(offset)
}
這個(gè)方法的作用是通過給定的位移值計(jì)算出對(duì)應(yīng)的日志段文件名。Kafka 日志文件固定是20 位的長(zhǎng)度,filenamePrefixFromOffset 方法就是用前面補(bǔ) 0 的方式,把給定位移值擴(kuò)充成一個(gè)固定 20 位長(zhǎng)度的字符串。
Log Class
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
為什么說Log是管理segment的容器,從這個(gè)字段就可以看出來,這是 Log 類中最重要的屬性之一。它保存了分區(qū)日志下所有的日志段信息,只不過是用 Map 的數(shù)據(jù)結(jié)構(gòu)來保存的。Map 的 Key 值是日志段的起始位移值,Value 則是日志段對(duì)象本身。Kafka 源碼使用 ConcurrentNavigableMap 數(shù)據(jù)結(jié)構(gòu)來保存日志段對(duì)象,就可以很輕松地利用該類提供的線程安全和各種支持排序的方法,來管理所有日志段對(duì)象。
它是鍵值(Key)可排序的 Map。Kafka 將每個(gè)日志段的起始位移值作為 Key,這樣一來,我們就能夠很方便地根據(jù)所有日志段的起始位移值對(duì)它們進(jìn)行排序和比較,同時(shí)還能快速地找到與給定位移值相近的前后兩個(gè)日志段。
Log中刪除Segment操作
三個(gè)留存策略
/**
* Delete any log segments that have either expired due to time based retention
* or because the log size is > retentionSize
*/
def deleteOldSegments(): Int = {
if (!config.delete) return 0
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
}
config.delete:配置的刪除策略,要配置delete才會(huì)進(jìn)行真實(shí)的刪除操作
deleteRetentionMsBreachedSegments:其中的核心條件是(segment, _) => startMs - segment.largestTimestamp > config.retentionMs,這是一個(gè)匿名函數(shù),startMs是當(dāng)前時(shí)間,largestTimestamp 正是上文提到的LogSegment在每次寫入日志時(shí)都會(huì)修改的最大時(shí)間戳,config.retentionMs也是上文中提到過的配置【retention.ms】,即topic的保留時(shí)間
deleteRetentionSizeBreachedSegments:這個(gè)刪除是受segment驅(qū)使,但是有個(gè)條件retention.bytes>0,但是目前生產(chǎn)環(huán)境這個(gè)配置是-1,所以生產(chǎn)環(huán)境的kafka實(shí)際是不會(huì)觸發(fā)超過指定大小后的刪除策略的
deleteLogStartOffsetBreachedSegments:下一個(gè)日志分段的起始偏移量 baseOffset 是否小于等于 logStartOffset,若是,則可以刪除此日志分段
消息量很少的情況
知道了上述刪除策略后,重點(diǎn)分析一下當(dāng)某個(gè)topic的消息數(shù)量很少的情況,即在segment在7天內(nèi),都沒有因?yàn)槌^segment的文件上線1G時(shí),該segment則一直是active segment,該topic也只有這唯一一個(gè)segment
假設(shè)這個(gè)topic的過期時(shí)間是15天,第1天產(chǎn)生了1條消息,第7天產(chǎn)生了1條消息,此時(shí)不滿足shouldRoll條件,不會(huì)切分segment,第13天又產(chǎn)生了一條消息,此時(shí)還是不滿足shouldRoll條件,那什么時(shí)候會(huì)進(jìn)行日志切分呢,當(dāng)下一條消息產(chǎn)生的時(shí)間跟上一條消息產(chǎn)生的時(shí)間相差超過了7天,此時(shí)才會(huì)進(jìn)行日志切分,所以存在一種可能性,對(duì)于這種消息產(chǎn)生量很少的日志可能永遠(yuǎn)不會(huì)過期,有點(diǎn)像緩存的續(xù)時(shí),一直給續(xù)上了。
asyncDeleteSegment
private def asyncDeleteSegment(segment: LogSegment) {
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
def deleteSeg() {
info(s"Deleting segment ${segment.baseOffset}")
maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
segment.deleteIfExists()
}
}
scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs)
}
執(zhí)行segment刪除上一個(gè)異步操作,首先是同步在.log文件后面加上.deleted的后綴,然后通過定時(shí)器scheduler,1分鐘延遲后異步刪除
.log文件和.index文件
查看文件詳情
查看log文件,以__consumer_offset topic為例 /data/app/kafka_2.12-2.6.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/__consumer_offsets-49/00000000000000000000.log --print-data-log
可以看到00000000000000000000.log的起始o(jì)ffset是19,最后一條offset是20,然后他的創(chuàng)建時(shí)間是1658900424154
又獲取了00000000000000000021.log的文件詳情,起始o(jì)ffset是21,創(chuàng)建時(shí)間是1659505428096,它與上一個(gè)segment的最后一條消息的時(shí)間差值是605003942,是大于7天的,所以是符合上述源碼分析的,即超過了segment切分時(shí)間配置【log.roll.hours】
柚子快報(bào)激活碼778899分享:分布式 kafka日志存儲(chǔ)
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。