欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)激活碼778899分享:Kafka 小結(jié)

柚子快報(bào)激活碼778899分享:Kafka 小結(jié)

http://yzkb.51969.com/

Kafka 是由 Linkedin 開(kāi)發(fā)并開(kāi)源的分布式消息系統(tǒng),因其分布式及高吞吐率而被廣泛使用,現(xiàn)已與 Cloudera Hadoop、Apache Storm、Apache Spark、Flink 集成。

Kafka 使用場(chǎng)景

頁(yè)面訪問(wèn)量 PV、頁(yè)面曝光 Expose、頁(yè)面點(diǎn)擊 Click 等行為事件實(shí)時(shí)計(jì)算中的 Kafka Source、Dataflow Pipeline業(yè)務(wù)的消息系統(tǒng),通過(guò)發(fā)布訂閱消息解耦多組微服務(wù),消除峰值(流入的速度和持久化落盤的速度的差速,流入多,消費(fèi)慢,用于做消息堆積,將流量平滑到下游的消費(fèi)系統(tǒng))

Kafka 是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)

以時(shí)間復(fù)雜度為 O(1) 的方式提供消息持久化能力,即使對(duì) TB 級(jí)以上的數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪問(wèn)性能高吞吐率,即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒 100K 條以上消息的傳輸支持 Kafka Server 間的消息分區(qū),以及分布式消費(fèi),同時(shí)保證每個(gè) Partition 內(nèi)的消息順序傳輸同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理Scale out,支持在線水平擴(kuò)展

為何使用消息系統(tǒng)

解耦

消息系統(tǒng)在處理過(guò)程中間插入了一個(gè)隱含的、基于數(shù)據(jù)的接口層,兩邊的處理過(guò)程都要實(shí)現(xiàn)這一接口。這允許你獨(dú)立地?cái)U(kuò)展或修改兩邊的處理過(guò)程,只要確保它們遵守同樣的接口約束。

而基于消息發(fā)布/訂閱機(jī)制,可以聯(lián)動(dòng)多個(gè)業(yè)務(wù)下游子系統(tǒng),能夠在不侵入的情況下分步編排和開(kāi)發(fā),保證數(shù)據(jù)的一致性。

冗余

有些情況下,處理數(shù)據(jù)的過(guò)程可能會(huì)失敗,除非數(shù)據(jù)被持久化,否則將造成丟失。消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過(guò)這一方式規(guī)避了數(shù)據(jù)丟失的風(fēng)險(xiǎn)。

許多消息隊(duì)列所采用的「插入 - 獲取 - 刪除」范式中,在把一個(gè)消息從隊(duì)列中刪除之前,需要處理系統(tǒng)明確地指出該消息已經(jīng)被處理完畢,從而確保數(shù)據(jù)被安全地保存直到使用完畢。

擴(kuò)展性

因?yàn)橄㈥?duì)列解耦了處理過(guò)程,所以增大消息入隊(duì)和處理的頻率是很容易的,只要另外增加處理過(guò)程即可。不需要改變代碼、不需要調(diào)節(jié)參數(shù),擴(kuò)展就像調(diào)大電力按鈕一樣簡(jiǎn)單。

靈活性 & 峰值處理能力

在訪問(wèn)量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見(jiàn)。如果為以能處理這類峰值訪問(wèn)為標(biāo)準(zhǔn),來(lái)投入資源隨時(shí)待命,無(wú)疑是巨大的浪費(fèi)。

使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問(wèn)壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷請(qǐng)求而完全崩潰。

可恢復(fù)性

系統(tǒng)的一部分組件失效時(shí),不會(huì)影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。

順序保證

在大多數(shù)使用場(chǎng)景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊(duì)列本來(lái)就是排序的,并且能夠保證數(shù)據(jù)會(huì)按照特定的順序來(lái)處理。 Kafka 保證一個(gè) Partition 內(nèi)的消息的有序性。

緩沖

在任何重要的系統(tǒng)中,都會(huì)有需要不同處理時(shí)間的元素。消息隊(duì)列通過(guò)一個(gè)緩沖層來(lái)幫助任務(wù)最高效率地執(zhí)行,寫入隊(duì)列的處理會(huì)盡可能地快速。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過(guò)系統(tǒng)的速度。

異步通訊

很多時(shí)候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時(shí)候再去處理它們。

Topic & Partition

Topic

Topic 在邏輯上可以被認(rèn)為是一個(gè) queue,每條消費(fèi)都必須指定它的 Topic,可以簡(jiǎn)單地理解為必須指明把這條消息放進(jìn)哪個(gè) queue 里。

我們把一類消息按照主題來(lái)分類,有點(diǎn)類似于數(shù)據(jù)庫(kù)中的表。

為了使得 Kafka 的吞吐率可以線性提高,物理上把 Topic 分成一個(gè)或多個(gè) Partition。對(duì)應(yīng)到系統(tǒng)上就是一個(gè)或若干個(gè)目錄。

Broker

Kafka 集群包含一個(gè)或多個(gè)服務(wù)器,每個(gè)服務(wù)器節(jié)點(diǎn)稱為一個(gè) Broker。

Broker 存儲(chǔ) Topic 的數(shù)據(jù)。如果某個(gè) Topic 有 N 個(gè) Partition,集群有 N 個(gè) Broker,那么每個(gè) Broker 存儲(chǔ)該 Topic 的一個(gè) Partition。

從 Scale out 的性能角度思考,通過(guò)更多節(jié)點(diǎn),帶來(lái)更多的存儲(chǔ),建立更多的 Partition,把 I/O 負(fù)載到更多的物理節(jié)點(diǎn)上,從而提高總吞吐 IOPS從 Scale up 的角度思考,一個(gè) Node 擁有越多的 Physical Disk,也可以負(fù)載更多的 Partition,從而提升總吞吐 IOPS

如果某個(gè) Topic 有 N 個(gè) Partition,集群有(N + M)個(gè) Broker,那么其中有 N 個(gè) Broker 存儲(chǔ)該 Topic 的一個(gè) Partition,剩下的 M 個(gè) Broker 不存儲(chǔ)該 Topic 的 Partition 數(shù)據(jù)如果某個(gè) Topic 有 N 個(gè) Partition,集群中 Broker 的數(shù)目少于 N 個(gè),那么一個(gè) Broker 存儲(chǔ)該 Topic 的一個(gè)或多個(gè) Partition

Topic 只是一個(gè)邏輯概念,真正在 Broker 間分布的是 Partition。

每一條消息被發(fā)送到 Broker 中,會(huì)根據(jù) Partition 規(guī)則,選擇被存儲(chǔ)到哪一個(gè) Partition。如果 Partition 規(guī)則設(shè)置的合理,那么所有的消息都可以均勻分布到不同的 Partition 中。

實(shí)驗(yàn)條件:3 個(gè) Broker,1 個(gè) Topic,無(wú) Replication,異步模式,3 個(gè) Producer,消息 Payload 為 100 字節(jié)。

當(dāng) Partition 數(shù)量小于 Broker 個(gè)數(shù)時(shí),Partition 數(shù)量越大,吞吐率越高,且呈線性提升Kafka 會(huì)將所有 Partition 均勻分布到所有 Broker 上,所以當(dāng)只有 2 個(gè) Partition 時(shí),會(huì)有 2 個(gè) Broker 為該 Topic 服務(wù);當(dāng)有 3 個(gè) Partition 時(shí),同理會(huì)有 3 個(gè) Broker 為該 Topic 服務(wù)當(dāng) Partition 數(shù)量多于 Broker 個(gè)數(shù)時(shí),總吞吐量并未有所提升,甚至還有所下降??赡艿脑蚴牵寒?dāng) Partition 數(shù)量為 4 或 5 時(shí),不同的 Broker 上的 Partition 數(shù)量不同,而 Producer 會(huì)將數(shù)據(jù)均勻發(fā)送到各 Partition 上,這就造成各 Broker 的負(fù)載不同,不能最大化集群吞吐量

存儲(chǔ)原理

Kafka 的消息是存在于文件系統(tǒng)之上的,Kafka 高度依賴于文件系統(tǒng)來(lái)存儲(chǔ)和緩存消息利用了操作系統(tǒng)的機(jī)制,當(dāng)磁盤操作發(fā)生時(shí),操作系統(tǒng)會(huì)將主內(nèi)存中未使用的空間作為緩存,以加速磁盤讀寫操作。因此,盡管磁盤的讀寫速度較慢,但通過(guò)操作系統(tǒng)的磁盤緩存機(jī)制,可以大大提高 Kafka 的性能Kafka 利用順序 I/O,以及 Page Cache 達(dá)成超高吞吐任何發(fā)布到 Partition 的消息都會(huì)被追加到 Partition 數(shù)據(jù)文件的尾部,這樣的順序?qū)懘疟P操作讓 Kafka 的效率非常高

Kafka 集群保留所有發(fā)布的 message,不管這個(gè) message 有沒(méi)有被消費(fèi)過(guò),Kafka 提供可配置的保留策略去刪除舊數(shù)據(jù)(還有一種策略是根據(jù)分區(qū)大小來(lái)刪除數(shù)據(jù))。

例如,如果將保留策略設(shè)置為兩天,在 message 寫入后的兩天內(nèi),它可用于消費(fèi),之后它將被丟棄以騰出空間。Kafka 的性能與存儲(chǔ)的數(shù)據(jù)量大小無(wú)關(guān), 所以即使將數(shù)據(jù)存儲(chǔ)很長(zhǎng)一段時(shí)間也是沒(méi)有問(wèn)題的。

Offset 偏移量,每條消息都有一個(gè)當(dāng)前 Partition 下唯一的 64 字節(jié)的 Offset,它相當(dāng)于當(dāng)前分區(qū)第一條消息的偏移量,即第幾條消息。消費(fèi)者可以指定消費(fèi)的位置信息,當(dāng)消費(fèi)者掛掉再重新恢復(fù)的時(shí)候,可以從消費(fèi)位置繼續(xù)消費(fèi)。

假設(shè)現(xiàn)在 Kafka 集群只有一個(gè) Broker,創(chuàng)建了 2 個(gè) Topic:Topic1 和 Topic2,Partition 的數(shù)量分別為 1 和 2,那么根目錄下就會(huì)創(chuàng)建如下三個(gè)文件夾。

在 Kafka 文件存儲(chǔ)中,同一個(gè) Topic 下可以有多個(gè)不同的 Partition,每個(gè) Partition 都是一個(gè)目錄。而每一個(gè)目錄又被平均分配成多個(gè)大小相等的 Segment File,Segment File 又是由 index file 和 data file 組成,它們總是成對(duì)出現(xiàn),后綴 .index 和 .log 分表表示 Segment 的索引文件和數(shù)據(jù)文件。

Segment 是 Kafka 文件存儲(chǔ)的最小單位。Segment 文件命名規(guī)則:Partition 全局的第一個(gè) Segment 從 0 開(kāi)始,后續(xù)每個(gè) Segment 文件名為上一個(gè) Segment 文件的最后一條消息的 Offset 值。

其中以索引文件中元數(shù)據(jù)(3, 497)為例,依次在數(shù)據(jù)文件中表示第 3 個(gè) message(在全局 Partition 中表示第 368769 + 3 = 368772 個(gè) message)以及該消息的物理偏移地址為 497(即該消息的起始位置離數(shù)據(jù)文件開(kāi)頭偏移了 497 個(gè)字節(jié),我們可以通過(guò)該偏移量查詢得到該消息的具體內(nèi)容)。

注意:該 Index 文件并不是從 0 開(kāi)始,也不是每次遞增 1 的(即索引文件中包含的索引記錄并不是連續(xù)的,并且索引條目中存儲(chǔ)的 message 在數(shù)據(jù)文件中的物理偏移量也不是連續(xù)遞增的)。

這是因?yàn)樵?Kafka 中一個(gè) Partition 中可能包含大量的 message,使用傳統(tǒng)的順序遞增方式存儲(chǔ)索引條目,可能會(huì)導(dǎo)致索引文件過(guò)大,占用過(guò)多的存儲(chǔ)空間,所以 Kafka 采取稀疏索引存儲(chǔ)的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。

稀疏索引存儲(chǔ)的思路是每隔一定的字節(jié)數(shù)(例如 1000 字節(jié))在數(shù)據(jù)文件和索引文件中建立一條索引記錄,而不是在每個(gè) message 之間都建立一條索引記錄。這樣做的好處是減小索引文件的大小,進(jìn)而減少索引文件的讀取和寫入開(kāi)銷,提高 Kafka 的讀寫性能。

減少索引占用的存儲(chǔ)空間,稀疏索引只在一定字節(jié)范圍內(nèi),每個(gè)分區(qū)只建立了一定數(shù)量的索引,相比于 Dense Index,可以大大減少索引占用的存儲(chǔ)空間提高索引查詢效率,稀疏索引建立了一定數(shù)量的索引,查詢時(shí)只需查詢這些索引,可以大大減少索引查詢所需的時(shí)間,提高查詢效率另外,Kafka 的數(shù)據(jù)寫入是追加寫入,且數(shù)據(jù)一旦寫入,就不能被修改和刪除。這種寫入方式也保證了索引的基本有序性,從而提高了查詢效率因?yàn)槠湮募麨樯弦粋€(gè) Segment 的最后一條消息的 Offset ,所以當(dāng)需要查找一個(gè)指定 Offset 的 message 時(shí),只需通過(guò)在所有的 Segment 的文件名中進(jìn)行二分查找,就能找到它所歸屬的 Segment,然后在其 Index 文件中找到對(duì)應(yīng)到文件上的物理位置,就能拿出該 message

Kafka 是如何準(zhǔn)確知道 message 的偏移的呢?

這是因?yàn)樵?Kafka 中定義了標(biāo)準(zhǔn)的數(shù)據(jù)存儲(chǔ)結(jié)構(gòu),在 Partition 中的每一條 message 都包含了以下三個(gè)屬性。

Offset:表示 message 在當(dāng)前 Partition 中的偏移量,是一個(gè)邏輯上的值,唯一確定了 Partition 中的一條 message,可以簡(jiǎn)單的認(rèn)為是一個(gè) IDMessageSize:表示 message 內(nèi)容 Data 的大小Data:message 的具體內(nèi)容

例如讀取 Offset = 368776 的 message,需要通過(guò)下面 2 個(gè)步驟查找。

查找 Segment File,其中 00000000000000000000.index 表示最開(kāi)始的文件,起始偏移量 Offset 為 0。第二個(gè)文件00000000000000368769.index 的消息起始偏移量為 368769 + 1 = 368770。其他后續(xù)文件依次類推,以起始偏移量命名并排序這些文件,只要根據(jù) Offset 二分查找文件列表,就可以快速定位到具體文件。 當(dāng) Offset = 368776 時(shí)定位到 00000000000000368769.index | log第二步通過(guò) Segment File 查找 message,通過(guò)第一步定位到 Segment File,當(dāng) Offset = 368776 時(shí),依次定位到00000000000000368769.index 的元數(shù)據(jù)物理位置和 00000000000000368769.log 的物理偏移地址,然后再通過(guò)00000000000000368769.log 順序查找直到 Offset = 368776 為止

Segment Index File 采取稀疏索引存儲(chǔ)方式,減少了索引文件的大小,通過(guò) mmap 可以直接內(nèi)存操作。

mmap(Memory-mapped file)是一種內(nèi)存映射文件機(jī)制,可以將文件數(shù)據(jù)直接映射到進(jìn)程的地址空間中,從而允許程序使用內(nèi)存來(lái)直接訪問(wèn)文件數(shù)據(jù),而無(wú)需將數(shù)據(jù)先讀入內(nèi)存再進(jìn)行訪問(wèn)。通過(guò) mmap,一塊連續(xù)的虛擬內(nèi)存地址被映射到文件的一個(gè)區(qū)域或整個(gè)文件上,將文件數(shù)據(jù)當(dāng)作內(nèi)存映射到了進(jìn)程地址空間。

數(shù)據(jù)能夠被更高效地讀取,因?yàn)樗梢灾苯訌拇疟P讀取到內(nèi)存中,可以避免頻繁地系統(tǒng)調(diào)用,提高文件 I/O 的效率內(nèi)存映射文件允許多個(gè)進(jìn)程同時(shí)操作一個(gè)文件,因?yàn)樗羞M(jìn)程都訪問(wèn)相同的內(nèi)存區(qū)域一旦內(nèi)存映射建立,對(duì)于多數(shù)情況(如文件比較大時(shí)),不能直接將內(nèi)存釋放,限制了系統(tǒng)的可用內(nèi)存由于內(nèi)存映射文件操作基于頁(yè)表,所以頻繁修改內(nèi)存區(qū)域的文件將產(chǎn)生很多的頁(yè)表項(xiàng)更新,增加了內(nèi)存管理的開(kāi)銷

Kafka 高效文件存儲(chǔ)設(shè)計(jì)的特點(diǎn)。

Kafka 把 Topic 中一個(gè) Parition 大文件分成多個(gè)小文件段,通過(guò)多個(gè)小文件段,就容易定期清除或刪除已經(jīng)消費(fèi)完的文件,減少磁盤占用通過(guò)索引信息可以快速定位 message 和確定 response 的最大大小通過(guò) Index 元數(shù)據(jù)全部映射到 memory,可以避免 Segment File 的 I/O 磁盤操作通過(guò)索引文件稀疏存儲(chǔ),可以大幅降低 Index 文件元數(shù)據(jù)占用空間的大小

Kafka 從 0.10.0.0 版本起,為分片日志文件中新增了一個(gè) .timeindex 的索引文件,可以根據(jù)時(shí)間戳定位消息。同樣可以通過(guò)腳本 kafka-dump-log.sh 查看時(shí)間索引的文件內(nèi)容。

首先定位分片,將 1570793423501 與每個(gè)分片的最大時(shí)間戳進(jìn)行對(duì)比(最大時(shí)間戳取時(shí)間索引文件的最后一條記錄時(shí)間,如果時(shí)間為 0 則取該日志分段的最近修改時(shí)間),直到找到大于或等于 1570793423501 的日志分段,因此會(huì)定位到時(shí)間索引文件00000000000003257573.timeindex,其最大時(shí)間戳為 1570793423505重復(fù)使用 Offset 找到 log 文件的步驟

Producer

Producer 發(fā)送消息到 Broker 時(shí),會(huì)根據(jù) Paritition 機(jī)制選擇將其存儲(chǔ)到哪一個(gè) Partition。如果 Partition 機(jī)制設(shè)置合理,所有消息都可以均勻分布到不同的 Partition 里,這樣就實(shí)現(xiàn)了負(fù)載均衡。

指明 Partition 的情況下,直接將給定的 Value 作為 Partition 的值沒(méi)有指明 Partition 但有 Key 的情況下,將 Key 的 Hash 值與分區(qū)數(shù)取余得到 Partition 值既沒(méi)有指明 Partition 也沒(méi)有 Key 的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用都在這個(gè)整數(shù)上自增),將這個(gè)值與可用的分區(qū)數(shù)取余,得到 Partition 值,也就是常說(shuō)的 Round-Robin 輪詢算法

生產(chǎn)者在發(fā)送消息之前需要選擇將消息要發(fā)送到哪個(gè)主題的哪個(gè)分區(qū)中,選擇完畢后將消息添加到批量消息中。批量消息中的所有消息都屬于同一主題和分區(qū)。一旦批量消息中的消息數(shù)量達(dá)到了一定閾值或等待時(shí)間到達(dá)了一定值,就會(huì)觸發(fā)批量消息的發(fā)送。發(fā)送過(guò)程將由生產(chǎn)者的一個(gè)單獨(dú)線程進(jìn)行處理。在這個(gè)過(guò)程中,生產(chǎn)者將批量消息發(fā)送到對(duì)應(yīng)的 Kafka Broker,Broker 將存儲(chǔ)這些消息并返回記錄元數(shù)據(jù)(RecordMetadata)對(duì)象作為響應(yīng)。

RecordMetadata 對(duì)象包含消息的主題、分區(qū)及位移等元數(shù)據(jù)信息。如果發(fā)送成功,則 RecordMetadata 中包含的位移表示消息在對(duì)應(yīng)分區(qū)中的位置。如果發(fā)送失敗,則生產(chǎn)者通常會(huì)重試該消息,直到達(dá)到一定的發(fā)送次數(shù)或超時(shí)時(shí)間。

Kafka 集群中的每個(gè) Partition 都會(huì)向生產(chǎn)者返回 ACK,以保證消息已經(jīng)被正確寫入。如果生產(chǎn)者收到 ACK,則將繼續(xù)發(fā)送下一輪的消息,如果沒(méi)有收到 ACK,則會(huì)進(jìn)行重發(fā)。通過(guò)這種機(jī)制,Kafka 生產(chǎn)者可以確保消息被可靠地發(fā)送到指定的主題和分區(qū)中。

生產(chǎn)者發(fā)送批量消息的目的是為了提高消息發(fā)送的效率。在 Kafka 中,生產(chǎn)者將消息添加到批量消息中,是為了減少單個(gè)消息的網(wǎng)絡(luò)傳輸開(kāi)銷和提高網(wǎng)絡(luò)帶寬的利用率。批量發(fā)送能夠?qū)⒍鄺l消息合并為一條發(fā)送,降低頻繁地網(wǎng)絡(luò)通信帶來(lái)的開(kāi)銷。

具體地,當(dāng)生產(chǎn)者發(fā)送消息到 Kafka 集群中的某個(gè) Partition 后,這些消息將被緩存在生產(chǎn)者的發(fā)送緩沖區(qū)中。為了提高消息發(fā)送的效率,生產(chǎn)者不會(huì)立刻將發(fā)送緩沖區(qū)中的單個(gè)消息發(fā)送出去,而是等待緩沖區(qū)中積累足夠多的消息之后,合并成一批消息進(jìn)行發(fā)送。批量消息中的每條消息都屬于同一個(gè)主題和分區(qū),通過(guò)這種方式可以減少單條消息的網(wǎng)絡(luò)傳輸時(shí)間和傳輸開(kāi)銷,提高消息發(fā)送的吞吐量和效率。

需要注意的是,批量消息的發(fā)送過(guò)程是由生產(chǎn)者的一個(gè)獨(dú)立線程進(jìn)行的,這也意味著批量消息的發(fā)送和單個(gè)消息的發(fā)送可能會(huì)存在一定的延遲差異。

Producer Exactly Once

0.11 版本的 Kafka,引入了冪等性:Producer 不論向 Server 發(fā)送多少重復(fù)數(shù)據(jù),Server 端都只會(huì)持久化一條。

要啟用冪等性,只需將 Producer 的參數(shù)中 enable.idompotence 設(shè)置為 true 即可。

開(kāi)啟冪等性的 Producer 在初始化時(shí)會(huì)被分配一個(gè) PID,發(fā)往同一 Partition 的消息會(huì)附帶 Sequence Number。而 Borker 端會(huì)對(duì)(PID, Partition, SeqNumber)做緩存,當(dāng)具有相同主鍵的消息提交時(shí),Broker 只會(huì)持久化一條。

但是 PID 重啟后就會(huì)發(fā)生變化,同時(shí)不同的 Partition 也具有不同主鍵,所以冪等性無(wú)法保證跨分區(qū)會(huì)話的 Exactly Once。

Consumer

我們可以創(chuàng)建一個(gè)消費(fèi)者實(shí)例去從 Kafka 中讀取消息,并且進(jìn)行檢查,最后產(chǎn)生結(jié)果數(shù)據(jù)。但如果生產(chǎn)者寫入消息的速度比消費(fèi)者讀取的速度快怎么辦呢?這樣隨著時(shí)間的增長(zhǎng),消息堆積會(huì)越來(lái)越嚴(yán)重。

對(duì)于這種場(chǎng)景,我們需要增加多個(gè)消費(fèi)者來(lái)進(jìn)行水平擴(kuò)展。

Kafka 消費(fèi)者是消費(fèi)組的一部分,當(dāng)多個(gè)消費(fèi)者形成一個(gè)消費(fèi)組來(lái)消費(fèi)主題時(shí),每個(gè)消費(fèi)者會(huì)收到不同分區(qū)的消息。

假設(shè)有一個(gè) T1 主題,該主題有 4 個(gè)分區(qū),同時(shí)有一個(gè)消費(fèi)組 G1,這個(gè)消費(fèi)組只有一個(gè)消費(fèi)者 C1。那么消費(fèi)者 C1 將會(huì)收到這 4 個(gè)分區(qū)的消息。

如果增加新的消費(fèi)者 C2 到消費(fèi)組 G1,那么每個(gè)消費(fèi)者將會(huì)分別收到兩個(gè)分區(qū)的消息。

如果增加到 4 個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者將會(huì)分別收到一個(gè)分區(qū)的消息。這時(shí)候每個(gè)消費(fèi)者都處理其中一個(gè)分區(qū),即滿負(fù)載運(yùn)行。

但如果繼續(xù)增加消費(fèi)者到這個(gè)消費(fèi)組,剩余的消費(fèi)者將會(huì)空閑,不會(huì)收到任何消息。

總而言之,可以通過(guò)增加消費(fèi)組的消費(fèi)者來(lái)進(jìn)行水平擴(kuò)展,提升消費(fèi)能力。這也是為什么建議創(chuàng)建主題時(shí)使用比較多的分區(qū)數(shù),這樣可以在消費(fèi)負(fù)載高的情況下增加消費(fèi)者來(lái)提升性能。

另外,消費(fèi)者的數(shù)量不應(yīng)該比分區(qū)數(shù)還多,因?yàn)槎喑鰜?lái)的消費(fèi)者是空閑的,沒(méi)有任何幫助。

如果 C1 處理消息仍然還有瓶頸,我們?nèi)绾蝺?yōu)化和處理?

把 C1 內(nèi)部的消息進(jìn)行二次 sharding,開(kāi)啟多個(gè) goroutine worker 進(jìn)行消費(fèi),為了保障 Offset 提交的正確性,需要使用 watermark 機(jī)制,保障最小的 Offset 保存,才能往 Broker 提交。

Consumer Group

Kafka 一個(gè)很重要的特性就是,只需寫入一次消息,就可以支持任意多的應(yīng)用讀取這個(gè)消息。換句話說(shuō),每個(gè)應(yīng)用都可以讀到全量的消息。而為了使得每個(gè)應(yīng)用都能讀到全量的消息,則應(yīng)用需要有不同的消費(fèi)組。

假如我們新增了一個(gè)新的消費(fèi)組 G2,而這個(gè)消費(fèi)組有兩個(gè)消費(fèi)者。在這個(gè)場(chǎng)景中,消費(fèi)組 G1 和消費(fèi)組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來(lái)說(shuō)它們屬于不同的應(yīng)用。

總而言之,如果應(yīng)用需要讀取全量消息,那么請(qǐng)為該應(yīng)用設(shè)置一個(gè)消費(fèi)組;如果該應(yīng)用消費(fèi)能力不足,那么可以考慮在這個(gè)消費(fèi)組里增加消費(fèi)者。

當(dāng)新的消費(fèi)者加入消費(fèi)組時(shí),它會(huì)消費(fèi)一個(gè)或多個(gè)分區(qū),而這些分區(qū)之前是由其他消費(fèi)者負(fù)責(zé)的。另外,當(dāng)消費(fèi)者離開(kāi)消費(fèi)組(比如重啟、宕機(jī)等)時(shí),它所消費(fèi)的分區(qū)會(huì)分配給其他消費(fèi)者,這種現(xiàn)象稱為重平衡(Rebalance)。

重平衡是 Kafka 一個(gè)很重要的性質(zhì),這個(gè)性質(zhì)保證了高可用和水平擴(kuò)展。不過(guò)在重平衡期間,所有消費(fèi)者都不能消費(fèi)消息,因此會(huì)造成整個(gè)消費(fèi)組短暫的不可用。而且,將分區(qū)進(jìn)行重平衡也會(huì)導(dǎo)致原來(lái)的消費(fèi)者狀態(tài)過(guò)期,從而導(dǎo)致消費(fèi)者需要重新更新?tīng)顟B(tài),這段期間也會(huì)降低消費(fèi)性能。

消費(fèi)者通過(guò)定期發(fā)送心跳(Hearbeat)到一個(gè)作為組協(xié)調(diào)者(Group Coordinator)的 Broker 來(lái)保持在消費(fèi)組內(nèi)存活。這個(gè) Broker 不是固定的,每個(gè)消費(fèi)組都可能不同。當(dāng)消費(fèi)者拉取消息或者提交時(shí),便會(huì)發(fā)送心跳。如果消費(fèi)者超過(guò)一定時(shí)間沒(méi)有發(fā)送心跳,那么它的會(huì)話(Session)就會(huì)過(guò)期,組協(xié)調(diào)者就會(huì)認(rèn)為該消費(fèi)者已經(jīng)宕機(jī),然后觸發(fā)重平衡。

從消費(fèi)者宕機(jī)到會(huì)話過(guò)期是有一定時(shí)間的,這段時(shí)間內(nèi)該消費(fèi)者的分區(qū)都不能進(jìn)行消息消費(fèi)。

通常情況下,我們可以進(jìn)行優(yōu)雅關(guān)閉,消費(fèi)者會(huì)發(fā)送離開(kāi)的消息到組協(xié)調(diào)者,這樣組協(xié)調(diào)者可以立即進(jìn)行重平衡而不需要等待會(huì)話過(guò)期。

在 0.10.1 版本,Kafka 對(duì)心跳機(jī)制進(jìn)行了修改,將發(fā)送心跳與拉取消息進(jìn)行分離,這樣使得發(fā)送心跳的頻率不受拉取的頻率影響。

另外更高版本的 Kafka 支持配置一個(gè)消費(fèi)者多長(zhǎng)時(shí)間不拉取消息但仍然保持存活,這個(gè)配置可以避免活鎖(livelock),即指應(yīng)用沒(méi)有故障但是由于某些原因?qū)е虏荒苓M(jìn)一步消費(fèi)。

但是活鎖也很容易導(dǎo)致連鎖故障,當(dāng)消費(fèi)端下游的組件性能退化時(shí),消息消費(fèi)會(huì)變的很慢,很容易觸發(fā) livelock 的重新均衡機(jī)制,反而影響了吞吐。

Partition 會(huì)為每個(gè) Consumer Group 保存一個(gè)偏移量,用于記錄 Group 消費(fèi)到的位置。

Kafka 從 0.9 版本開(kāi)始將消費(fèi)端的位移信息保存在集群的內(nèi)部主題(__consumer_offsets)中,該主題默認(rèn)為 50 個(gè)分區(qū),每條日志項(xiàng)的格式都是 「TopicPartition, OffsetAndMetadata」,key 為主題分區(qū),主要存放主題、分區(qū)以及消費(fèi)組的信息,value 為 OffsetAndMetadata 對(duì)象,主要包括位移、位移提交時(shí)間、自定義元數(shù)據(jù)等信息。

通過(guò)將消費(fèi)者的偏移量信息保存在內(nèi)部主題中,Kafka 可以實(shí)現(xiàn)偏移量的持久化和可靠性,確保消費(fèi)者在分布式環(huán)境中能夠正確追蹤消費(fèi)位置和進(jìn)行故障恢復(fù)。這些偏移量信息對(duì)于消費(fèi)者組的負(fù)載均衡、重新分配以及實(shí)現(xiàn) Exactly-Once 語(yǔ)義等功能至關(guān)重要。

位移追蹤:消費(fèi)位移主題保存了每個(gè)消費(fèi)者組在每個(gè)分區(qū)上的偏移量,記錄了消費(fèi)者組消費(fèi)消息的位置,可以準(zhǔn)確地追蹤消費(fèi)者的消費(fèi)進(jìn)度組內(nèi)偏移量同步:消費(fèi)位移主題充當(dāng)了消費(fèi)者組中各個(gè)消費(fèi)者之間的協(xié)調(diào)者。消費(fèi)者可以通過(guò)訂閱消費(fèi)位移主題來(lái)同步最新的偏移量信息,以便進(jìn)行負(fù)載均衡、分組再平衡等操作恢復(fù)和恢復(fù)消費(fèi):通過(guò)存儲(chǔ)偏移量信息,消費(fèi)者能夠在崩潰或重啟后重新加入消費(fèi)組,并繼續(xù)從上一次消費(fèi)的位置繼續(xù)消費(fèi)消息

分組協(xié)調(diào)者(Group Coordinator)是一個(gè)服務(wù),Kafka 集群中的每個(gè)節(jié)點(diǎn)在啟動(dòng)時(shí)都會(huì)啟動(dòng)這樣一個(gè)服務(wù),該服務(wù)主要是用來(lái)存儲(chǔ)消費(fèi)分組相關(guān)的元數(shù)據(jù)信息,每個(gè)消費(fèi)組均會(huì)選擇一個(gè)協(xié)調(diào)者來(lái)負(fù)責(zé)組內(nèi)各個(gè)分區(qū)的消費(fèi)位移信息存儲(chǔ),選擇的主要步驟如下。

確定消費(fèi)組的位移信息存入哪個(gè)分區(qū):前面提到默認(rèn)的 __consumer_offsets 主題分區(qū)數(shù)為 50,通過(guò)以下算法可以計(jì)算出對(duì)應(yīng)消費(fèi)組的位移信息應(yīng)該存入哪個(gè)分區(qū),partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount),其中 groupId 為消費(fèi)組的 id,由消費(fèi)端指定,groupMetadataTopicPartitionCount 為主題分區(qū)數(shù)根據(jù) partition 尋找該分區(qū)的 leader 所對(duì)應(yīng)的節(jié)點(diǎn) Broker,該 Broker 的 Coordinator 即為該消費(fèi)組的 Coordinator

Consumer Commit Offset

消費(fèi)端可以通過(guò)設(shè)置參數(shù) enable.auto.commit 來(lái)控制是自動(dòng)提交還是手動(dòng)提交,如果值為 true 則表示自動(dòng)提交,在消費(fèi)端的后臺(tái)會(huì)定時(shí)的提交消費(fèi)位移信息,時(shí)間間隔由 auto.commit.interval.ms(默認(rèn)為 5 秒)。

可能存在重復(fù)的位移數(shù)據(jù)提交到消費(fèi)位移主題中,因?yàn)椴还苁欠裼行碌南M(fèi)記錄,每隔 5 秒就會(huì)往主題中寫入一條消息,這樣就會(huì)產(chǎn)生大量的同 key 消息,而其實(shí)只需要一條,因此需要依賴日志壓縮策略來(lái)清理數(shù)據(jù)重復(fù)消費(fèi),假設(shè)位移提交的時(shí)間間隔為 5 秒,那么在 5 秒內(nèi)如果發(fā)生了 Rebalance,則所有的消費(fèi)者都會(huì)從上一次提交的位移處開(kāi)始消費(fèi),那么期間消費(fèi)的數(shù)據(jù)則會(huì)再次被消費(fèi)

集中 Delivery Guarantee。

讀完消息先 commit 再處理消息。在這種模式下,如果 Consumer 在 commit 后還沒(méi)來(lái)得及處理消息,就 crash 了,那么下次重新開(kāi)始工作后就無(wú)法讀到剛剛已提交而未處理的消息,即 At most once讀完消息先處理再 commit。在這種模式下,如果在處理完消息之后 commit 之前,Consumer 發(fā)生 crash,那么下次重新開(kāi)始工作時(shí)還會(huì)處理剛剛未 commit 的消息,而實(shí)際上該消息已經(jīng)被處理過(guò)了,即 At least once

在一些使用場(chǎng)景中,由于消息具有主鍵并具備冪等性,因此在處理這些消息時(shí)可以達(dá)到 Exactly once 的語(yǔ)義。然而,該說(shuō)法被認(rèn)為有些牽強(qiáng),原因如下。

冪等性與 Exactly once 的區(qū)別:冪等性是指對(duì)于同一條消息的重復(fù)處理,結(jié)果等效于只處理一次。而 Exactly once 是指確保每條消息僅被處理一次,并且不會(huì)產(chǎn)生重復(fù)或丟失主鍵并不能完全保證操作的冪等性:雖然在一些場(chǎng)景中,消息具有主鍵,可以通過(guò)主鍵去判斷并保證冪等性。但是主鍵并不能完全解決所有冪等性的問(wèn)題,因?yàn)樘幚聿僮鞯膬绲刃赃€取決于具體的業(yè)務(wù)邏輯和操作細(xì)節(jié)Delivery guarantee 關(guān)注的是消息被處理的次數(shù):Exactly once 語(yǔ)義通常是指對(duì)于每條消息,保證它被處理的次數(shù)正確。而不同的處理方式可以有不同的結(jié)果,因此我們不應(yīng)該將處理過(guò)程的特性(如冪等性)看作 Kafka 的特性因此,盡管在某些情況下,通過(guò)消息的主鍵和冪等性可以部分滿足 Exactly once 的要求,但這并不代表 Kafka 本身提供了 Exactly once 的機(jī)制。實(shí)際上,在保證消息處理的冪等性方面,仍然需要結(jié)合具體的業(yè)務(wù)邏輯和設(shè)計(jì)來(lái)確保消息處理的正確性和可靠性

Push vs Pull

作為一個(gè)消息系統(tǒng),Kafka 遵循了傳統(tǒng)的方式,選擇由 Producer 向 Broker push 消息,并由 Consumer 從 Broker pull 消息。一些 logging-centric system(指將日志作為系統(tǒng)設(shè)計(jì)和運(yùn)維的核心組成部分的系統(tǒng)),比如 Facebook 的 Scribe 和 Cloudera 的 Flume,采用 push 模式。

事實(shí)上,push 模式 和 pull 模式各有優(yōu)劣。

push 模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由 Broker 決定的。push 模式的目標(biāo)是盡可能地以最快速度傳遞消息,但是這樣很容易造成 Consumer 來(lái)不及處理消息,最典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞而 pull 模式則可以根據(jù) Consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾蕘?lái)消費(fèi)消息

對(duì)于 Kafka 而言,pull 模式更合適。pull 模式可簡(jiǎn)化 Broker 的設(shè)計(jì),Consumer 可自主控制消費(fèi)消息的速率,同時(shí) Consumer 可以自己控制消費(fèi)方式,即可批量消費(fèi)也可逐條消費(fèi),同時(shí)還能選擇不同的提交方式,從而實(shí)現(xiàn)不同的傳輸語(yǔ)義。

pull 模式則可以根據(jù) Consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。但 pull 模式不足之處是,如果 Kafka 中沒(méi)有數(shù)據(jù),消費(fèi)者可能會(huì)陷入循環(huán)中,一直返回空數(shù)據(jù)。

因?yàn)橄M(fèi)者從 Broker 主動(dòng)拉取數(shù)據(jù),需要維護(hù)一個(gè)長(zhǎng)輪詢,針對(duì)這一點(diǎn), Kafka 的消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)會(huì)傳入一個(gè)時(shí)長(zhǎng)參數(shù) timeout。如果當(dāng)前沒(méi)有數(shù)據(jù)可供消費(fèi),Consumer 會(huì)等待一段時(shí)間之后再返回,這段時(shí)長(zhǎng)即為 timeout。

總而言之,Kafka 是采用了 pull 模式來(lái)滿足消費(fèi)者主動(dòng)拉取消息的需求,而生產(chǎn)者則是使用 push 模式將消息推送到 Kafka。

Replication

Kafka 在 0.8 以前的版本中,并不提供 HA 機(jī)制(指系統(tǒng)具備高可用性和容錯(cuò)能力的一種設(shè)計(jì)和實(shí)現(xiàn)方式,旨在保證系統(tǒng)在面對(duì)單點(diǎn)故障或異常情況時(shí)仍能持續(xù)提供服務(wù)),一旦一個(gè)或多個(gè) Broker 宕機(jī),則宕機(jī)期間其上所有的 Partition 都將無(wú)法繼續(xù)提供服務(wù)。若該 Broker 永遠(yuǎn)不能再恢復(fù),亦或磁盤故障,則其上的數(shù)據(jù)將丟失。

在 Kafka 0.8 以前的版本中,是沒(méi)有 Replication 的,一旦某一個(gè) Broker 宕機(jī),則其上所有的 Partition 數(shù)據(jù)都將不可被消費(fèi),這與 Kafka 的數(shù)據(jù)持久性及 Delivery Guarantee 的設(shè)計(jì)目標(biāo)相悖。同時(shí) Producer 都不能再將數(shù)據(jù)存于這些 Partition 中。

如果 Producer 使用同步模式,則 Producer 會(huì)在嘗試重新發(fā)送 message.send.max.retries(默認(rèn)值為 3)次后拋出 Exception,用戶可以選擇停止發(fā)送后續(xù)數(shù)據(jù),也可選擇繼續(xù)發(fā)送。前者會(huì)造成數(shù)據(jù)的阻塞,后者會(huì)造成本應(yīng)發(fā)往該 Broker 的數(shù)據(jù)丟失如果 Producer 使用異步模式,則 Producer 會(huì)嘗試重新發(fā)送 message.send.max.retries(默認(rèn)值為 3)次后記錄該異常,并繼續(xù)發(fā)送后續(xù)數(shù)據(jù),這會(huì)造成數(shù)據(jù)丟失,并且只能通過(guò)日志發(fā)現(xiàn)該問(wèn)題

由此可見(jiàn),在沒(méi)有 Replication 的情況下,一旦某臺(tái)機(jī)器宕機(jī)或者某個(gè) Broker 停止工作,則會(huì)造成整個(gè)系統(tǒng)的可用性降低。隨著集群規(guī)模的增加,整個(gè)集群中出現(xiàn)該類異常的幾率大大增加,因此對(duì)于生產(chǎn)系統(tǒng)而言,Replication 機(jī)制的引入非常重要。

Leader

引入 Replication 之后,同一個(gè) Partition 可能會(huì)有多個(gè) Replica,這時(shí)需要在這些 Replica 之間選出一個(gè) Leader,Producer 和 Consumer 只與這個(gè) Leader 進(jìn)行交互,其它的 Replica 作為 Follower 從 Leader 中復(fù)制數(shù)據(jù)。

因?yàn)樾枰WC同一個(gè) Partition 中的多個(gè) Replica 之間的數(shù)據(jù)一致性(其中一個(gè)宕機(jī)后其它的 Replica 必須要能繼續(xù)服務(wù),并且既不能造成數(shù)據(jù)重復(fù),也不能造成數(shù)據(jù)丟失)。

如果沒(méi)有一個(gè) Leader,所有的 Replica 都可同時(shí)讀/寫數(shù)據(jù),那就需要保證多個(gè) Replica 之間互相(N × N 條通路)同步數(shù)據(jù),數(shù)據(jù)的一致性和有序性非常難保證,大大增加了 Replication 實(shí)現(xiàn)的復(fù)雜性,同時(shí)也增加了出現(xiàn)異常的幾率。而引入 Leader 后,只有 Leader 負(fù)責(zé)數(shù)據(jù)的讀寫,F(xiàn)ollower 只向 Leader 順序 Fetch 數(shù)據(jù)(N 條通路),使得系統(tǒng)更加簡(jiǎn)單且高效。

由于 Kafka 集群目前依賴于 Zookeeper 集群,所以最簡(jiǎn)單最直觀的方案是:所有的 Follower 都在 ZooKeeper 上設(shè)置一個(gè) Watch,一旦 Leader 宕機(jī),則其對(duì)應(yīng)的 ephemeral znode 會(huì)自動(dòng)刪除,此時(shí)所有的 Follower 都嘗試創(chuàng)建該節(jié)點(diǎn),而創(chuàng)建成功者(ZooKeeper 保證了只有一個(gè)能創(chuàng)建成功)即是新的 Leader,其他的 Replica 即為 Follower。

但這種方案可能存在以下缺點(diǎn)。

split-brain(腦裂):這是由 ZooKeeper 的特性所引起的,雖然 ZooKeeper 能保證所有的 Watch 按順序觸發(fā),但并不能保證同一時(shí)刻所有的 Replica "看到"的狀態(tài)是一樣的,這就可能造成不同 Replica 的響應(yīng)不一致herd effect(羊群效應(yīng)):如果宕機(jī)的那個(gè) Broker 上的 Partition 比較多,則會(huì)造成多個(gè) Watch 被觸發(fā),造成集群內(nèi)大量的調(diào)整ZooKeeper 負(fù)載過(guò)重:每個(gè) Replica 都要為此在 ZooKeeper 上注冊(cè)一個(gè) Watch,而當(dāng)集群規(guī)模增加到幾千個(gè) Partition 時(shí),ZooKeeper 的負(fù)載會(huì)過(guò)重

Controller

Kafka 的 Leader Election 方案解決了上述問(wèn)題,它在所有的 Broker 中選出一個(gè) Controller,所有 Partition 的 Leader 選舉都是由 Controller 來(lái)決定的。Controller 會(huì)將 Leader 的改變直接通過(guò) RPC 的方式(比 ZooKeeper Queue 的方式更高效)通知需要為此作為響應(yīng)的 Broker(指通知其他 Broker,告知它們有關(guān) Partition Leader 的變化,并需要它們做出相應(yīng)的響應(yīng))。

其他 Broker 收到通知后,會(huì)根據(jù)新的 Leader 信息做出相應(yīng)的調(diào)整。這可能包括更新本地的元數(shù)據(jù)(比如更新自身負(fù)責(zé)的 Partition 的 Leader 信息),重新分配相關(guān)資源等。

通過(guò)這種方式,Kafka 能夠快速地在集群中進(jìn)行 Leader 選舉,并將新的 Leader 信息迅速傳播給其他 Broker,以保證系統(tǒng)的可靠性和高可用性。

Kafka 集群 Controller 的選舉過(guò)程如下。

每個(gè) Broker(Kafka 服務(wù)器節(jié)點(diǎn))在 ZooKeeper 中的 /controller 路徑上注冊(cè)一個(gè) Watch。此 Watch 用于監(jiān)視 Controller 的變化當(dāng)前的 Controller(即 Leader)出現(xiàn)故障或意外退出時(shí),對(duì)應(yīng)的 /controller 路徑會(huì)失去數(shù)據(jù),這是因?yàn)樵撀窂绞且粋€(gè)臨時(shí)節(jié)點(diǎn)(ephemeral node),它與創(chuàng)建它的 Broker 的會(huì)話相關(guān)聯(lián)。一旦會(huì)話終止,該臨時(shí)節(jié)點(diǎn)將被自動(dòng)刪除當(dāng) ZooKeeper 上的 /controller 路徑消失時(shí),所有"活著"的 Broker 都會(huì)參與競(jìng)選新的 Controller。每個(gè) Broker 都會(huì)創(chuàng)建一個(gè)新的 Controller Path,即在 /controller 路徑上創(chuàng)建一個(gè)新的臨時(shí)節(jié)點(diǎn)ZooKeeper 保證了競(jìng)選過(guò)程的原子性和獨(dú)占性,只會(huì)有一個(gè) Broker 在競(jìng)選中成功,成為新的 Controller,而其他 Broker 則會(huì)競(jìng)選失敗競(jìng)選成功的 Broker 成為新的 Controller,負(fù)責(zé)管理與集群相關(guān)的操作,如 Partition 的 Leader 選舉、自動(dòng)創(chuàng)建和刪除 Topic 等。它還負(fù)責(zé)與其他 Broker 進(jìn)行協(xié)調(diào),以確保集群的正常運(yùn)行競(jìng)選失敗的 Broker 在以上過(guò)程完成后,會(huì)重新注冊(cè) Watch 監(jiān)視新的 /controller 路徑,等待下一次 Controller(Leader)故障和選舉

Kafka Partition Leader 的選舉過(guò)程如下(由 Controller 執(zhí)行)。

從 Zookeeper 中讀取當(dāng)前分區(qū)的所有 ISR(in-sync replicas)集合調(diào)用配置的分區(qū)選擇算法選擇分區(qū)的 Leader

Partition 分布

Kafka 集群中的 Partition(分區(qū))復(fù)制(Replication)默認(rèn)情況下是自動(dòng)分配的。在 Kafka 集群中,每個(gè) Broker 都有均等分配 Partition 的 Leader 機(jī)會(huì)。

假設(shè)創(chuàng)建 1 個(gè) Topic,包含 4 個(gè) Partition,2 個(gè) Replication。

上圖中,箭頭指向?yàn)楦北?,?Partition-0 為例,Broker1 中 Parition-0 為 Leader,Broker2 中 Partition-0 為副本。

當(dāng)集群中新增 2 個(gè)節(jié)點(diǎn),Partition 增加到 6 個(gè)。

上圖中,每個(gè) Broker(按照 BrokerId 有序)依次分配主 Partition,下一個(gè) Broker 則為副本,如此循環(huán)迭代分配,多副本都遵循此規(guī)則。

副本分配算法如下。

將所有 n 個(gè) Broker 和待分配的 i 個(gè) Partition 進(jìn)行排序?qū)⒌?i 個(gè) Partition 分配到第(i mod n)個(gè) Broker 上將第 i 個(gè) Partition 的第 j 個(gè)副本分配到第((i + j) mod n)個(gè) Broker 上

Leader

和大部分的分布式系統(tǒng)一樣,Kafka 處理失敗需要明確定義一個(gè) Broker 是否"活著"。對(duì)于 Kafka 而言,存活包含兩個(gè)條件。

副本所在節(jié)點(diǎn)需要與 ZooKeeper 維持 Session(這個(gè)通過(guò) ZK 的 Heartbeat 機(jī)制來(lái)實(shí)現(xiàn))從副本的最后一條消息的 Offset 需要和主副本的最后一條消息的 Offset 的差值不超過(guò)設(shè)定閾值(replica.lag.max.messages),或者從副本的 LEO 落后于主副本的 LEO 的時(shí)長(zhǎng)不大于設(shè)定閾值(replica.lag.time.max.ms)。官方推薦使用后者進(jìn)行判斷,并在 Kafka 0.10.0 移除了replica.lag.max.messages 參數(shù)

Leader 會(huì)跟蹤與其保持同步的 Replica 列表,該列表稱為 ISR(即 In-Sync Replica)。如果一個(gè) Follower 宕機(jī),或者落后太多,則 Leader 將會(huì)把它從 ISR 中移除,當(dāng)其再次滿足以上條件之后,則又會(huì)被重新加入集合中。

ISR 的引入主要是為了解決同步復(fù)制和異步復(fù)制這兩種方案各自的缺陷。

在同步復(fù)制中,當(dāng) Producer 向 Kafka 集群發(fā)送消息時(shí),要求至少有一個(gè) Replica(副本)確認(rèn)已經(jīng)成功寫入消息,然后返回一個(gè)確認(rèn)給 Producer。這種確認(rèn)確保了數(shù)據(jù)的可靠性,因?yàn)橹挥性谥辽僖粋€(gè)副本寫入成功后才會(huì)返回確認(rèn)。如果無(wú)法滿足至少一個(gè)副本寫入成功的條件,Producer 將會(huì)收到錯(cuò)誤的響應(yīng)。如果有從副本宕機(jī)或者超時(shí),就會(huì)拖慢該副本組的整體性能在異步復(fù)制中,當(dāng) Producer 向 Kafka 集群發(fā)送消息時(shí),不需要等待副本的確認(rèn),而是立即返回一個(gè)確認(rèn)給 Producer。這樣可以提高 Producer 的吞吐量,因?yàn)樗恍枰却_認(rèn)。但是,這種方式存在一定的風(fēng)險(xiǎn),因?yàn)槿绻北驹谏形磳懭胂r(shí)出現(xiàn)故障,數(shù)據(jù)可能會(huì)丟失

在 Kafka 中,默認(rèn)情況下使用異步復(fù)制的方式,用以提高性能和吞吐量。但是,如果數(shù)據(jù)的可靠性是更重要的因素,那么可以選擇使用同步復(fù)制??梢酝ㄟ^(guò)配置 Producer 的屬性來(lái)控制復(fù)制方式的選擇。

綜上所述,無(wú)論是同步復(fù)制還是異步復(fù)制,Kafka 都會(huì)為每個(gè) Partition 維護(hù)一個(gè) In-Sync Replica(ISR)的集合,這是一組已經(jīng)追上了 Leader(領(lǐng)導(dǎo)者)的副本。只有 ISR 中的副本才會(huì)參與消息的讀寫操作。當(dāng)副本無(wú)法追上 Leader 或者發(fā)生故障時(shí),會(huì)被移出 ISR,待恢復(fù)后再次加入 ISR。這樣可以保證數(shù)據(jù)的一致性和可用性。

Replicated log

分布式日志系統(tǒng),主要保證以下兩點(diǎn)。

commit log 不會(huì)丟失commit log 在不同機(jī)器上是一致的

基于主從復(fù)制的 Replicated log 實(shí)現(xiàn)。

raft:基于多數(shù)節(jié)點(diǎn)的 ack,節(jié)點(diǎn)一般稱為 leader/followerpacificA:基于所有節(jié)點(diǎn)的 ack,節(jié)點(diǎn)一般稱為 primary/secondarybookkeeper:基于法定個(gè)數(shù)節(jié)點(diǎn)的 ack,節(jié)點(diǎn)一般稱為 writer/bookie

Kafka 在 Zookeeper 中動(dòng)態(tài)維護(hù)了一個(gè) ISR,ISR 里的所有 Replica 都是已經(jīng)跟上了 Leader,只有 ISR 里的成員才有被選為 Leader 的可能。在這種模式下,對(duì)于 f + 1 個(gè) Replica 而言,一個(gè) Partition 能在保證不丟失已經(jīng) commit 的消息的前提下,容忍 f 個(gè) Replica 的失敗。

在大多數(shù)使用場(chǎng)景中,這種模式是非常有利的。需要注意的是,為了容忍 f 個(gè) Replica 的失敗,Majority Vote 和 ISR 在提交(commit)之前都需要等待至少 f + 1 個(gè) Replica 的確認(rèn)。這是為了確保數(shù)據(jù)的可靠性和一致性。但是,ISR 所需的總 Replica 的個(gè)數(shù)幾乎是 Majority Vote 的一半,因?yàn)樗话呀?jīng)跟上 Leader 的 Replica,而其他的 Replica 則在追趕過(guò)程中或者有可能落后。

而對(duì)于 Producer 而言,它可以選擇是否等待消息 commit,這可以通過(guò) request.required.acks 來(lái)設(shè)置。

0:Producer 發(fā)送消息后,不需要等待任何確認(rèn),直接返回1:Producer 發(fā)送消息后,等待 Leader 確認(rèn)接收成功,然后返回all(或 -1):Producer 發(fā)送消息后,等待 ISR 中的所有 Replica 確認(rèn)接收成功后,才返回

當(dāng)設(shè)置為 all(或 -1)時(shí),這種機(jī)制確保了只要 ISR 中有一個(gè)或以上的 Follower,一條被 commit 的消息就不會(huì)丟失。因?yàn)橹挥斜?ISR 中的所有 Replica 確認(rèn)接收成功后,消息才會(huì)被標(biāo)記為已提交(committed),即使 Leader 在此期間發(fā)生故障,在新的 Leader 選舉完成后,仍然可以保證消息的可靠性。

通過(guò)設(shè)置不同的 request.required.acks 參數(shù)值,Producer 可以根據(jù)不同的需求和對(duì)消息可靠性的要求來(lái)平衡吞吐量和數(shù)據(jù)的一致性。較大的等待確認(rèn)級(jí)別可能會(huì)導(dǎo)致較高的延遲,但能提供更高的可靠性保證。而較小的等待確認(rèn)級(jí)別則可以提供更低的延遲,但可能會(huì)增加消息丟失的風(fēng)險(xiǎn)。

High Watermark & Log End Offset

初始時(shí) Leader 和 Follower 的 HW 和 LEO 都是 0。Leader 中的 remote LEO 指的就是 Leader 端保存的 Follower LEO,也被初始化成 0。此時(shí),Producer 沒(méi)有發(fā)送任何消息給 Leader,而 Follower 已經(jīng)開(kāi)始不斷地給 Leader 發(fā)送 FETCH 請(qǐng)求了,但因?yàn)闆](méi)有任何數(shù)據(jù),因此什么都不會(huì)發(fā)生。

值得一提的是,F(xiàn)ollower 發(fā)送過(guò)來(lái)的 FETCH 請(qǐng)求因?yàn)闆](méi)有數(shù)據(jù),會(huì)暫時(shí)被寄存到 Leader 端的 purgatory 中,等待 500ms(replica.fetch.wait.max.ms 參數(shù))超時(shí)后會(huì)強(qiáng)制完成。倘若在寄存期間 Producer 端發(fā)送過(guò)來(lái)了數(shù)據(jù),那么 Kafka 會(huì)自動(dòng)喚醒該 FETCH 請(qǐng)求,讓 Leader 繼續(xù)處理。

High Watermark(高水位標(biāo)記)和 Log End Offset(日志結(jié)束位置)是 Kafka 中重要的概念,用于跟蹤消息的復(fù)制和消費(fèi)狀態(tài)。

High Watermark(HW):High Watermark 是每個(gè) Partition 中的一個(gè)重要標(biāo)記,表示已經(jīng)被認(rèn)為是"已復(fù)制"和"可安全消費(fèi)"的最高消息偏移量(Offset)。在 Kafka 中,Producer 發(fā)送消息到 Leader,Leader 將這些消息寫入日志并進(jìn)行復(fù)制。當(dāng)所有 Replica 都將消息復(fù)制到其本地日志中,并且其偏移量等于或大于 High Watermark 時(shí),此消息被認(rèn)為是"已復(fù)制"的。High Watermark 表示了可以安全從該 Partition 進(jìn)行消費(fèi)的偏移量Log End Offset(LEO):Log End Offset 是指 Partition 中當(dāng)前日志的最高偏移量。它表示了當(dāng)前日志中最新的消息的偏移量。Producer 生產(chǎn)消息時(shí),消息被追加到 Partition 的日志中,并分配一個(gè)唯一的偏移量。每次寫入新的消息,Log End Offset 都會(huì)增加。消費(fèi)者可以通過(guò)跟蹤 Log End Offset,了解最新可消費(fèi)的消息

High Watermark 和 Log End Offset 的關(guān)系如下。

對(duì)于 Leader 來(lái)說(shuō),High Watermark 是其所有 Follower 中最小的 Log End Offset,并且 Leader 只有在所有 Follower 都復(fù)制了該偏移量之后,才能將 High Watermark 推進(jìn)到新的值對(duì)于 Follower 來(lái)說(shuō),High Watermark 不等于自身的 Log End Offset,而是表示 Leader 的 High Watermark 的位置。Follower 的 Log End Offset 可能會(huì)落后于 Leader 的 High Watermark,這是正常的復(fù)制機(jī)制

High Watermark 的存在保證了消息的可靠性和一致性。只有當(dāng)消息被所有的 Replica 都復(fù)制并達(dá)到 High Watermark 之后,Kafka 才確保消息不會(huì)丟失或被暫時(shí)消費(fèi)。

消費(fèi)者可以以 High Watermark 作為消費(fèi)的起點(diǎn),確保消費(fèi)的消息是可靠的和一致的。消費(fèi)者可以根據(jù) High Watermark 將已經(jīng)被確認(rèn)為"已復(fù)制"的消息進(jìn)行消費(fèi),而不用擔(dān)心未被復(fù)制的消息可能會(huì)丟失。

柚子快報(bào)激活碼778899分享:Kafka 小結(jié)

http://yzkb.51969.com/

相關(guān)閱讀

評(píng)論可見(jiàn),查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18561936.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問(wèn)

文章目錄