柚子快報(bào)邀請(qǐng)碼778899分享:kafka
柚子快報(bào)邀請(qǐng)碼778899分享:kafka
1. kafka 的結(jié)構(gòu)
Kafka 是一個(gè)分布式流處理平臺(tái),主要用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道和流應(yīng)用。Kafka 的結(jié)構(gòu)可以分為以下幾個(gè)主要部分:
Producer(生產(chǎn)者):
生產(chǎn)者負(fù)責(zé)將數(shù)據(jù)發(fā)布到 Kafka 中的主題(Topic)。生產(chǎn)者可以發(fā)送單個(gè)消息或者批量消息到指定的主題。 Consumer(消費(fèi)者):
消費(fèi)者從主題中讀取數(shù)據(jù)。消費(fèi)者可以是單個(gè)實(shí)例,也可以組成一個(gè)消費(fèi)者組(Consumer Group)。Kafka 支持多消費(fèi)者模型,每個(gè)消費(fèi)者組能夠獨(dú)立讀取主題中的數(shù)據(jù)。 Broker(代理):
Kafka 集群中的每個(gè)節(jié)點(diǎn)稱為一個(gè) Broker。Broker 負(fù)責(zé)接收、存儲(chǔ)和提供來自生產(chǎn)者的消息。一個(gè) Kafka 集群通常由多個(gè) Broker 組成,分布式存儲(chǔ)消息,以實(shí)現(xiàn)高可用性和容錯(cuò)性。 Topic(主題):
數(shù)據(jù)在 Kafka 中是通過主題組織的。生產(chǎn)者將數(shù)據(jù)發(fā)送到主題中,消費(fèi)者從主題中讀取數(shù)據(jù)。主題可以分為多個(gè)分區(qū)(Partition),分區(qū)使得主題能夠水平擴(kuò)展,以提高吞吐量和并行處理能力。 Partition(分區(qū)):
每個(gè)主題都可以被分為多個(gè)分區(qū),分區(qū)是 Kafka 中的基本并行單元。每個(gè)分區(qū)中的消息是有序的,但是跨分區(qū)的消息沒有全局順序。通過分區(qū),Kafka 能夠在集群中分布負(fù)載,并實(shí)現(xiàn)高并發(fā)的數(shù)據(jù)處理。 Zookeeper(協(xié)調(diào)服務(wù)):
Kafka 使用 Zookeeper 來管理集群的元數(shù)據(jù)、Broker 狀態(tài)、主題配置等信息。在新的 Kafka 版本中,Zookeeper 被逐漸替代為 Kafka 自帶的集群協(xié)調(diào)功能,但在舊的版本中,Zookeeper 是必需的。 Leader 和 Follower(領(lǐng)導(dǎo)者和追隨者):
每個(gè)分區(qū)都有一個(gè) Leader 和若干 Follower。Leader 負(fù)責(zé)處理生產(chǎn)者和消費(fèi)者的讀寫請(qǐng)求,而 Follower 只負(fù)責(zé)同步 Leader 中的數(shù)據(jù)。當(dāng) Leader 失效時(shí),Kafka 通過 Zookeeper 或自帶的協(xié)調(diào)功能選舉新的 Leader。
Kafka 的消息存儲(chǔ)和處理流程
生產(chǎn)者發(fā)送消息:生產(chǎn)者將消息發(fā)送到某個(gè)主題。消息會(huì)按照某種邏輯(如輪詢、鍵值哈希等)分發(fā)到該主題的不同分區(qū)。Broker 接收消息:每個(gè)主題的分區(qū)會(huì)分布在不同的 Broker 上,Broker 負(fù)責(zé)接收并存儲(chǔ)這些消息。消費(fèi)者消費(fèi)消息:消費(fèi)者從特定的分區(qū)讀取消息。Kafka 通過消費(fèi)者組(Consumer Group)管理消費(fèi)進(jìn)度,確保消息被所有需要的消費(fèi)者組消費(fèi)。
Kafka 的這種分布式架構(gòu)使其能夠處理海量的實(shí)時(shí)數(shù)據(jù)流,常用于日志收集、事件跟蹤、實(shí)時(shí)分析、消息隊(duì)列等場(chǎng)景。
2. 消息可靠性
Kafka 通過多種機(jī)制來保證消息的可靠性,確保消息在生產(chǎn)、傳輸、消費(fèi)的各個(gè)環(huán)節(jié)中不丟失、不重復(fù),并且達(dá)到高可用性。以下是 Kafka 保證消息可靠性的主要機(jī)制:
1. 消息確認(rèn)機(jī)制(ACK)
當(dāng)生產(chǎn)者發(fā)送消息時(shí),Kafka 會(huì)通過 acknowledgment(ACK) 來確認(rèn)消息是否成功寫入。生產(chǎn)者可以配置不同的 ACK 級(jí)別來決定消息確認(rèn)的方式:
acks=0:生產(chǎn)者不等待任何確認(rèn)。消息一旦發(fā)送就認(rèn)為已成功,不論 Kafka 是否接收。這種方式最快,但可靠性最低。acks=1:生產(chǎn)者等待 Leader 確認(rèn)消息已寫入。Leader 成功寫入后立即響應(yīng)生產(chǎn)者,但如果 Leader 在 Follower 完成同步前崩潰,消息可能丟失。acks=all:生產(chǎn)者等待所有副本(Leader 和所有 ISR 中的 Follower)確認(rèn)消息寫入成功。只有當(dāng)所有副本都寫入成功后,生產(chǎn)者才會(huì)收到確認(rèn),這是最高級(jí)別的可靠性保障。
2. ISR 機(jī)制(In-Sync Replicas,同步副本)
Kafka 的每個(gè)分區(qū)都有多個(gè)副本,通過 數(shù)據(jù)復(fù)制機(jī)制 來保證消息的持久性和容錯(cuò)性,包括一個(gè) Leader 副本 和多個(gè) Follower 副本。Leader 負(fù)責(zé)處理所有的寫入和讀取請(qǐng)求,而 Follower 負(fù)責(zé)從 Leader 同步數(shù)據(jù)。當(dāng)生產(chǎn)者發(fā)送消息到某個(gè)分區(qū)的 Leader 副本時(shí),Leader 會(huì)同步數(shù)據(jù)到該分區(qū)的所有 Follower 副本。這確保即使 Leader 宕機(jī),其他 Follower 副本可以選舉為新的 Leader,繼續(xù)提供服務(wù)。復(fù)制機(jī)制的數(shù)量由 replication.factor 配置決定,通常設(shè)置為 3,即每條消息會(huì)有 3 個(gè)副本,以提供更高的容災(zāi)能力。Kafka 使用 ISR(In-Sync Replicas)機(jī)制 來追蹤與 Leader 保持同步的 Follower 副本。當(dāng)生產(chǎn)者使用 acks=all 發(fā)送消息時(shí),Kafka 會(huì)確保消息被寫入所有 ISR 副本,只有在 ISR 中的所有副本都確認(rèn)收到消息后,才會(huì)向生產(chǎn)者返回成功確認(rèn)。這種機(jī)制保證了即使 Leader 崩潰,其他副本也能承擔(dān)工作,不會(huì)丟失數(shù)據(jù)。如果某個(gè)分區(qū)的 Leader 副本失效,Kafka 會(huì)通過 Zookeeper 或 KRaft 機(jī)制進(jìn)行 Leader 選舉,從該分區(qū)的 ISR 中選擇一個(gè)新的 Leader 副本來繼續(xù)處理讀寫請(qǐng)求。選舉過程通常非常快速,保證了 Kafka 的高可用性。
3. 持久化存儲(chǔ)與順序?qū)懭?/p>
Kafka 將消息以 順序?qū)懭?的方式持久化到磁盤中,并且通過文件系統(tǒng)緩存來提升寫入性能。這種順序?qū)懭胂啾入S機(jī)寫入更高效,減少了磁盤的 I/O 操作,并且即使系統(tǒng)崩潰,已寫入磁盤的消息也不會(huì)丟失。Kafka 在磁盤上以日志文件的形式存儲(chǔ)數(shù)據(jù),并為每個(gè)分區(qū)的消息維護(hù)一個(gè) offset(偏移量)。消費(fèi)者通過 offset 來追蹤自己消費(fèi)的進(jìn)度,確保能夠從正確的位置繼續(xù)消費(fèi)。
4. 冪等性生產(chǎn)者(Idempotent Producer)
Kafka 支持 冪等性生產(chǎn)者,即生產(chǎn)者可以確保每條消息在同一分區(qū)中只寫入一次,即使因?yàn)榫W(wǎng)絡(luò)故障或重試機(jī)制導(dǎo)致同一消息被多次發(fā)送,Kafka 也能確保該消息只被寫入一次,避免消息重復(fù)。開啟冪等性后,Kafka 給每個(gè)生產(chǎn)者分配一個(gè)唯一的 PID(Producer ID)并為每個(gè)消息分配序列號(hào),通過這種機(jī)制 Kafka 能夠檢測(cè)到重復(fù)的消息寫入,并丟棄重復(fù)的消息。
5. 事務(wù)性生產(chǎn)者(Transactional Producer)
Kafka 支持 事務(wù)性生產(chǎn)者,允許生產(chǎn)者在多個(gè)分區(qū)上實(shí)現(xiàn)全局的事務(wù)操作,確保一組消息要么全部成功寫入,要么全部失敗。這對(duì)于保證跨分區(qū)、跨主題的操作一致性非常重要。事務(wù)性生產(chǎn)者通過 beginTransaction、commitTransaction、abortTransaction 等 API 來管理事務(wù)操作,這種機(jī)制能夠避免部分消息被寫入的情況,從而確保數(shù)據(jù)的一致性和完整性。
6. 消費(fèi)者的 Offset 管理
消費(fèi)者 offset 是消費(fèi)者在分區(qū)中消費(fèi)的進(jìn)度標(biāo)記。Kafka 提供兩種方式來管理 offset:
自動(dòng)提交(auto commit):Kafka 會(huì)在特定時(shí)間間隔(通過 auto.commit.interval.ms 配置)自動(dòng)提交消費(fèi)者的消費(fèi)進(jìn)度。如果系統(tǒng)崩潰,可能會(huì)導(dǎo)致消費(fèi)的部分消息未被提交,導(dǎo)致消息重復(fù)消費(fèi)。手動(dòng)提交:消費(fèi)者可以手動(dòng)管理 offset,只有在確認(rèn)處理完當(dāng)前消息后,才會(huì)提交消費(fèi)進(jìn)度。這種方式允許消費(fèi)者在系統(tǒng)崩潰后,從上一次成功消費(fèi)的位置繼續(xù)消費(fèi),保證消息不丟失。
7. 日志保留策略(Log Retention Policy)
Kafka 提供靈活的 日志保留策略 來管理消息的存儲(chǔ)時(shí)間和空間:
基于時(shí)間的保留:可以配置 Kafka 保留消息的時(shí)間長(zhǎng)度,例如 log.retention.hours 指定 Kafka 將消息保留一定時(shí)間后自動(dòng)刪除?;诖笮〉谋A簦嚎梢栽O(shè)置日志文件的大小上限,達(dá)到上限后 Kafka 會(huì)自動(dòng)刪除舊的日志文件?;?Log Compaction(日志壓縮):Kafka 支持日志壓縮,Kafka 會(huì)保留每個(gè)消息鍵的最新版本,從而減少存儲(chǔ)空間的占用,同時(shí)保留歷史數(shù)據(jù)的最新狀態(tài)。
8. 流處理的可靠性(Exactly Once 語義)
Kafka 通過冪等性生產(chǎn)者和事務(wù)性生產(chǎn)者的結(jié)合,提供了 Exactly Once 語義(EOS),確保消息在生產(chǎn)、傳輸和消費(fèi)時(shí)不會(huì)出現(xiàn)丟失或重復(fù)的情況。這種語義尤其重要于流式處理場(chǎng)景,比如使用 Kafka Streams 框架時(shí),可以確保在流處理應(yīng)用中,消息處理的結(jié)果是精確一致的。
總結(jié) Kafka 通過 ACK 機(jī)制、ISR 副本同步、持久化存儲(chǔ)、Leader 選舉、冪等性、事務(wù)性支持以及消費(fèi)者的 offset 管理等多個(gè)機(jī)制共同確保了消息在生產(chǎn)、傳輸、存儲(chǔ)和消費(fèi)各個(gè)環(huán)節(jié)的可靠性。這些設(shè)計(jì)讓 Kafka 能夠在分布式環(huán)境中提供高可用、高可靠的消息傳輸服務(wù)。
3. 消息有序性
Kafka 在主題(Topic)層面上不能保證全局消息有序,但它能在 分區(qū)(Partition) 內(nèi)保證消息的嚴(yán)格順序。具體來說,Kafka 保證消息在同一個(gè)分區(qū)內(nèi)按照生產(chǎn)的順序被寫入,并且消費(fèi)者也會(huì)按照相同的順序消費(fèi)這些消息。
在 Kafka 中,消息的順序性是以同一個(gè)分區(qū)和同一個(gè)消費(fèi)者組為基礎(chǔ)的。這樣說是正確的。
在 Kafka 中,如果有多個(gè)生產(chǎn)者同時(shí)向同一個(gè)分區(qū)發(fā)送消息,那么消息的有序性將無法得到保證。這種情況下可以考慮引入第三方進(jìn)行判斷控制,比如:在消息中加上時(shí)間戳字段,不過這樣的話會(huì)導(dǎo)致性能嚴(yán)重下降,得不償失。
1. 分區(qū)級(jí)別的消息順序
分區(qū)內(nèi)順序保證:Kafka 確保同一分區(qū)內(nèi)的消息是按生產(chǎn)者的寫入順序存儲(chǔ)的。無論生產(chǎn)者發(fā)送多少消息到這個(gè)分區(qū),消息在寫入時(shí)都會(huì)嚴(yán)格按照發(fā)送的順序被追加到該分區(qū)的日志文件中。因此,消費(fèi)者讀取消息時(shí),會(huì)按照消息的順序依次消費(fèi)。多分區(qū)的無序性:如果一個(gè)主題有多個(gè)分區(qū),不同分區(qū)之間的消息順序并不保證。因?yàn)?Kafka 在不同的分區(qū)上并行處理數(shù)據(jù),跨分區(qū)的消息沒有全局順序。因此,只有分區(qū)內(nèi)是有序的。
2. 使用消息鍵(Message Key)
生產(chǎn)者在發(fā)送消息時(shí)可以指定 消息鍵(Key)。Kafka 使用該鍵來決定消息要被發(fā)送到哪個(gè)分區(qū)。使用相同的鍵可以保證所有相關(guān)的消息都會(huì)路由到同一個(gè)分區(qū)。這意味著如果生產(chǎn)者對(duì)某一類型的消息(例如同一用戶的操作或同一個(gè)訂單的事件)使用相同的鍵,則這些消息會(huì)被寫入到同一個(gè)分區(qū),并保證它們的順序性。通過這種方式,Kafka 可以在某個(gè)特定粒度(如按用戶、訂單等)上保證消息的有序性。
3. 單分區(qū)的應(yīng)用場(chǎng)景
對(duì)于必須嚴(yán)格保證消息全局有序的場(chǎng)景,通常使用單個(gè)分區(qū)的主題。單分區(qū)保證所有消息都順序?qū)懭氩㈨樞蜃x取。但使用單分區(qū)意味著吞吐量受到限制,因?yàn)?Kafka 的并行處理能力依賴于多個(gè)分區(qū)。如果應(yīng)用對(duì)性能有較高的要求,這種方式的擴(kuò)展性會(huì)受到限制。
4. 生產(chǎn)者的冪等性(Idempotent Producer)
Kafka 支持 冪等性生產(chǎn)者,使得生產(chǎn)者可以確保在分區(qū)中按順序發(fā)送消息并且消息不會(huì)重復(fù)。這對(duì)有序性有幫助,因?yàn)閮绲刃陨a(chǎn)者會(huì)跟蹤消息的順序,并保證即使在重試的情況下,消息依然按順序?qū)懭搿绲刃陨a(chǎn)者通過每個(gè)分區(qū)上的序列號(hào)來確保消息的有序性和不重復(fù)。當(dāng)開啟冪等性時(shí),Kafka 確保每個(gè)分區(qū)的消息按序?qū)懭氩⑶也粫?huì)出現(xiàn)重復(fù)消息,從而維護(hù)分區(qū)內(nèi)的有序性。
5. 消費(fèi)者的順序消費(fèi)
消費(fèi)者在分區(qū)內(nèi)的順序消費(fèi):消費(fèi)者讀取消息時(shí),Kafka 確保每個(gè)分區(qū)的消息按照它們被寫入的順序依次讀取。消費(fèi)者通過 offset(偏移量)來管理消費(fèi)進(jìn)度,Kafka 記錄每個(gè)消費(fèi)者在每個(gè)分區(qū)上的消費(fèi)位置,消費(fèi)者從上次消費(fèi)的 offset 處繼續(xù)讀取,保持分區(qū)內(nèi)消息的順序性。單線程消費(fèi)分區(qū):如果一個(gè)消費(fèi)者實(shí)例同時(shí)消費(fèi)多個(gè)分區(qū),Kafka 并不能保證跨分區(qū)的消費(fèi)順序。如果消息有嚴(yán)格的順序需求,最好讓每個(gè)消費(fèi)者實(shí)例只消費(fèi)一個(gè)分區(qū),避免并發(fā)處理導(dǎo)致的順序問題。
6. 避免分區(qū)重新分配的影響
當(dāng) Kafka 的分區(qū)重新分配(例如某個(gè)消費(fèi)者崩潰,導(dǎo)致 Kafka 將它的分區(qū)分配給其他消費(fèi)者)時(shí),消費(fèi)者可能會(huì)短暫中斷。在此期間,Kafka 會(huì)確保已提交的 offset 仍然有效,新的消費(fèi)者從上次提交的位置繼續(xù)消費(fèi),避免消息的亂序。為了更好地控制分區(qū)重新分配時(shí)的順序問題,應(yīng)用程序可以手動(dòng)管理 offset 提交。這樣可以確保消息處理完后才提交 offset,從而避免消息亂序或丟失。
7. 復(fù)制機(jī)制中的順序保障
Kafka 的復(fù)制機(jī)制通過 Leader-Follower 架構(gòu)來保證消息的有序性。生產(chǎn)者只能將消息寫入分區(qū)的 Leader 副本,并且只有當(dāng)所有同步副本(ISR)都確認(rèn)收到消息后,Kafka 才會(huì)認(rèn)為該消息被成功提交(在 acks=all 的情況下)。Follower 會(huì)按照 Leader 的順序復(fù)制消息,從而保證所有副本中的消息順序一致。當(dāng) Leader 副本崩潰時(shí),Kafka 會(huì)從同步的 Follower 副本中選出一個(gè)新的 Leader,該副本的數(shù)據(jù)順序與之前的 Leader 一致,因此不會(huì)破壞分區(qū)內(nèi)的消息順序。
總結(jié)
Kafka 通過以下機(jī)制來保證消息的有序性:
分區(qū)級(jí)別的順序保證:在同一個(gè)分區(qū)內(nèi),消息按順序?qū)懭牒拖M(fèi)。使用消息鍵保證相關(guān)消息路由到同一個(gè)分區(qū):確保具有相同鍵的消息在分區(qū)內(nèi)有序。冪等性生產(chǎn)者:避免重復(fù)消息并保證消息按順序?qū)懭搿OM(fèi)者按 offset 順序讀?。捍_保分區(qū)內(nèi)的消息順序消費(fèi)。Leader-Follower 架構(gòu)的同步復(fù)制:在副本之間保持消息的順序一致性。
這些機(jī)制共同確保了 Kafka 在高并發(fā)和分布式環(huán)境下能夠保證消息的有序性,特別是在分區(qū)內(nèi)的順序處理。
4. 高吞吐量原因
Kafka 的設(shè)計(jì)旨在處理大量數(shù)據(jù)流,并在高并發(fā)環(huán)境中提供極高的吞吐量。Kafka 能夠?qū)崿F(xiàn)高吞吐量的原因包括以下幾個(gè)關(guān)鍵點(diǎn):
1. 順序?qū)懭肴罩荆⊿equential Write)
Kafka 的核心是基于日志文件的持久化存儲(chǔ),消息被順序?qū)懭氪疟P。這種 順序?qū)懭?相比隨機(jī)寫入大大減少了磁盤 I/O 操作,因?yàn)橛脖P順序?qū)懭氡入S機(jī)寫入效率高得多。即使在磁盤上的操作,順序?qū)懭胍彩欠浅?斓摹,F(xiàn)代操作系統(tǒng)在順序?qū)懭雸?chǎng)景中提供了文件系統(tǒng)緩存,進(jìn)一步提升了寫入速度。Kafka 利用操作系統(tǒng)的頁緩存將數(shù)據(jù)批量寫入磁盤,減少了磁盤操作的頻率。
2. 零拷貝(Zero Copy)
Kafka 采用了 Linux 提供的 零拷貝(Zero Copy) 技術(shù)。這種技術(shù)允許 Kafka 在處理數(shù)據(jù)時(shí),能夠直接從磁盤把數(shù)據(jù)復(fù)制到網(wǎng)絡(luò)緩沖區(qū),而不經(jīng)過用戶態(tài)的內(nèi)存拷貝。零拷貝減少了 CPU 的負(fù)載,避免了傳統(tǒng)的從磁盤讀入內(nèi)存再發(fā)送到網(wǎng)絡(luò)的多次拷貝過程,大幅度提高了網(wǎng)絡(luò)傳輸?shù)男?。這是 Kafka 實(shí)現(xiàn)高吞吐量的一大關(guān)鍵。
3. 分區(qū)(Partitioning)和并行處理
Kafka 通過將主題分為多個(gè) 分區(qū)(Partition) 來支持并行處理。每個(gè)分區(qū)是獨(dú)立的日志文件,生產(chǎn)者和消費(fèi)者可以同時(shí)讀寫不同的分區(qū)。這種設(shè)計(jì)使得 Kafka 的吞吐量能夠隨著分區(qū)數(shù)和節(jié)點(diǎn)數(shù)的增加而水平擴(kuò)展。多個(gè)生產(chǎn)者可以同時(shí)向不同的分區(qū)寫入消息,而多個(gè)消費(fèi)者可以從不同分區(qū)讀取消息,從而提高了系統(tǒng)的整體并發(fā)處理能力。
4. 批量處理(Batching)
Kafka 支持生產(chǎn)者和消費(fèi)者批量處理消息。生產(chǎn)者在發(fā)送消息時(shí)可以通過批量發(fā)送來減少網(wǎng)絡(luò)請(qǐng)求的次數(shù),消費(fèi)者在消費(fèi)時(shí)也可以一次拉取多個(gè)消息進(jìn)行處理。批量處理降低了生產(chǎn)者和消費(fèi)者與 Kafka 之間的網(wǎng)絡(luò)開銷,同時(shí)提升了吞吐量。例如,生產(chǎn)者可以將多條消息緩存在內(nèi)存中,然后一次性發(fā)送到 Kafka,從而減少頻繁的網(wǎng)絡(luò) I/O 操作。
5. 異步發(fā)送(Asynchronous Send)
Kafka 允許生產(chǎn)者采用 異步發(fā)送 模式,即消息發(fā)送后不需要等待服務(wù)器確認(rèn)立即返回。這種非阻塞的方式避免了生產(chǎn)者在發(fā)送每條消息時(shí)的延遲,極大地提高了生產(chǎn)者的發(fā)送速度。生產(chǎn)者可以通過配置來控制消息發(fā)送的確認(rèn)方式(acks),以及批量發(fā)送的大小和時(shí)間窗口,從而在吞吐量和可靠性之間進(jìn)行權(quán)衡。
6. 可配置的消息持久化策略
Kafka 提供了靈活的持久化策略。例如,Kafka 允許配置消息的副本數(shù)量(replication factor),決定每條消息存儲(chǔ)在多少個(gè)節(jié)點(diǎn)上。如果設(shè)置較低的副本數(shù)量,可以減少數(shù)據(jù)復(fù)制的負(fù)擔(dān),提升吞吐量。此外,Kafka 支持通過配置控制消息的持久化方式和存儲(chǔ)時(shí)間(如 log.retention.bytes 和 log.retention.hours),以平衡吞吐量和持久化需求。
7. 壓縮機(jī)制
Kafka 支持 消息壓縮,生產(chǎn)者可以在發(fā)送消息前對(duì)其進(jìn)行壓縮(例如使用 Gzip、Snappy 或 LZ4)。壓縮消息后,數(shù)據(jù)量減少,網(wǎng)絡(luò)傳輸?shù)拈_銷也隨之降低,提升了消息的傳輸效率。消息壓縮特別適用于高頻率的小消息場(chǎng)景,通過壓縮可以顯著減少帶寬占用,提高系統(tǒng)整體吞吐量。
8. 副本復(fù)制異步化(Asynchronous Replication)
Kafka 在復(fù)制數(shù)據(jù)時(shí)使用了 異步復(fù)制機(jī)制,即數(shù)據(jù)首先被寫入到分區(qū)的 Leader 副本,然后再異步地復(fù)制到 Follower 副本上。這種設(shè)計(jì)使得生產(chǎn)者的寫入操作不需要等待所有副本同步完成,從而減少了寫入延遲,提升了吞吐量。如果生產(chǎn)者配置了 acks=1 或 acks=0,則只要 Leader 成功寫入消息即可返回確認(rèn),而不必等待所有 Follower 同步完成。這在不要求最高可靠性的場(chǎng)景中可以極大提升吞吐量。
9. 分布式架構(gòu)和負(fù)載均衡
Kafka 的 分布式架構(gòu) 允許多個(gè) Broker(節(jié)點(diǎn))共同處理消息,每個(gè) Broker 可以負(fù)責(zé)不同主題的分區(qū)。分區(qū)機(jī)制結(jié)合多節(jié)點(diǎn)的架構(gòu),使 Kafka 能夠橫向擴(kuò)展處理能力,隨著 Broker 的增加,系統(tǒng)的吞吐量也相應(yīng)提高。Kafka 通過 負(fù)載均衡 機(jī)制,將不同分區(qū)分布到不同的 Broker 上,從而使整個(gè)集群能夠更好地利用硬件資源,避免單個(gè)節(jié)點(diǎn)的性能瓶頸。
10. Zookeeper 協(xié)調(diào)(或 KRaft)
Kafka 使用 Zookeeper 來管理集群的元數(shù)據(jù)和分區(qū) Leader 的選舉,但 Zookeeper 的負(fù)載很小。Kafka 的 Broker 之間主要通過直接通信進(jìn)行協(xié)調(diào),Zookeeper 僅在發(fā)生 Leader 選舉或元數(shù)據(jù)變更時(shí)起作用。這種架構(gòu)設(shè)計(jì)讓 Kafka 的正常操作與 Zookeeper 的負(fù)載解耦,大部分時(shí)間 Zookeeper 處于低負(fù)載狀態(tài),因此不會(huì)成為吞吐量的瓶頸。
11. 消費(fèi)者的并行拉?。≒ull)模式
Kafka 的消費(fèi)者使用 拉取(Pull)模式,消費(fèi)者主動(dòng)從 Kafka 中拉取消息,而不是由 Kafka 推送消息。這種設(shè)計(jì)可以讓消費(fèi)者根據(jù)自己的處理能力決定何時(shí)拉取消息,避免了消費(fèi)者因處理不過來而被推送的消息淹沒,降低了系統(tǒng)的壓力。同時(shí),消費(fèi)者可以并行處理來自不同分區(qū)的消息,這進(jìn)一步提高了消費(fèi)端的吞吐量。
總結(jié)
Kafka 實(shí)現(xiàn)高吞吐量的關(guān)鍵在于:
順序?qū)懭?和 零拷貝 技術(shù)極大優(yōu)化了磁盤和網(wǎng)絡(luò) I/O。分區(qū)機(jī)制 支持并行處理,允許系統(tǒng)擴(kuò)展。批量處理 和 異步發(fā)送 減少了網(wǎng)絡(luò)和 CPU 資源消耗。異步復(fù)制 和 可配置的可靠性 提供了性能與可靠性的靈活平衡。壓縮機(jī)制 和 消費(fèi)者拉取模式 降低了帶寬和處理負(fù)載。
這些設(shè)計(jì)讓 Kafka 能夠在處理大量數(shù)據(jù)流時(shí)保持極高的吞吐量,同時(shí)還能擴(kuò)展到大規(guī)模的分布式環(huán)境中。
5. 解決消息堆積問題
在 Kafka 系統(tǒng)中,消息堆積 問題是指消息不斷地被生產(chǎn),但消費(fèi)者無法及時(shí)消費(fèi),導(dǎo)致消息在 Kafka 隊(duì)列中積壓。消息堆積如果不及時(shí)處理,會(huì)導(dǎo)致系統(tǒng)性能下降甚至崩潰。解決消息堆積問題需要從多方面優(yōu)化 Kafka 的架構(gòu)和配置,下面是一些常見的解決方案:
1. 增加消費(fèi)者數(shù)量
擴(kuò)展消費(fèi)者數(shù)量 是最直接的方法,通過 消費(fèi)者組(Consumer Group) 增加消費(fèi)端的并發(fā)能力。Kafka 中,每個(gè)分區(qū)只能被消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi),因此如果消費(fèi)者的數(shù)量小于分區(qū)的數(shù)量,會(huì)限制消費(fèi)速度。增加消費(fèi)者數(shù)量使得更多的分區(qū)能夠同時(shí)被多個(gè)消費(fèi)者并行消費(fèi),從而加快消費(fèi)速度,緩解消息堆積的問題。
2. 增加分區(qū)數(shù)量
增加分區(qū)數(shù)量 可以提高 Kafka 的并行處理能力。一個(gè)分區(qū)只能被一個(gè)消費(fèi)者實(shí)例消費(fèi),因此如果分區(qū)數(shù)量有限,增加消費(fèi)者實(shí)例也無法提高消費(fèi)并發(fā)。通過增加分區(qū)數(shù)量,擴(kuò)展 kafka 集群可以增加更多的 Broker,讓增加的分區(qū)分配到不同的 Broker 上,可以增加系統(tǒng)的吞吐量,并允許更多消費(fèi)者并發(fā)消費(fèi)。需要注意的是,分區(qū)數(shù)增加后,生產(chǎn)者的負(fù)載和 Kafka 的元數(shù)據(jù)管理開銷也會(huì)增加,必須在擴(kuò)展前做好容量規(guī)劃。
3. 優(yōu)化消費(fèi)者的消費(fèi)能力
提高消費(fèi)者處理能力 可以有效解決消費(fèi)端的瓶頸。消費(fèi)者處理消息的速度慢,可能是由于處理邏輯復(fù)雜、I/O 操作頻繁、計(jì)算資源不足等原因??梢圆扇∫韵聝?yōu)化措施:
異步處理:消費(fèi)者可以在消費(fèi)消息后立即返回,而不是等待處理完成后才拉取下一批消息。通過異步處理,消費(fèi)者能夠更快地拉取消息,提高消費(fèi)速度。批量消費(fèi):配置消費(fèi)者批量拉取消息,而不是一次處理一條。通過 fetch.min.bytes 和 fetch.max.wait.ms 配置,可以調(diào)整消費(fèi)者每次拉取的消息量和等待時(shí)間,從而減少網(wǎng)絡(luò)請(qǐng)求的次數(shù)。優(yōu)化處理邏輯:分析和優(yōu)化消費(fèi)者內(nèi)部的處理邏輯,盡量減少不必要的 I/O 操作,使用多線程、異步任務(wù)等技術(shù)提升處理效率。
4. 壓縮日志文件
Kafka 的日志壓縮(Log Compaction) 機(jī)制可以減少存儲(chǔ)空間,并且?guī)椭幚矶逊e問題。日志壓縮會(huì)保留每個(gè)消息鍵的最新版本,刪除冗余的舊消息,從而減少堆積消息的數(shù)量。特別是對(duì)于需要持久保存的數(shù)據(jù)流或長(zhǎng)時(shí)間保留的消息,日志壓縮可以有效減少堆積的消息量。
5. 增加硬件資源
消息堆積可能是由于 Kafka 集群資源不足造成的。增加硬件資源(例如更快的磁盤、更多的 CPU 核心或增加內(nèi)存)能夠顯著提高 Kafka 的處理能力。磁盤性能:Kafka 使用磁盤順序?qū)懭雭沓志没?,因此磁盤 I/O 是 Kafka 性能的關(guān)鍵因素。使用更快的 SSD 磁盤或者增加磁盤帶寬,可以提高 Kafka 寫入和讀取的性能,減少消息堆積。CPU 和內(nèi)存:如果 Kafka 消費(fèi)者或生產(chǎn)者的處理速度受限于 CPU 或內(nèi)存資源,增加這些資源也能幫助提升吞吐量,緩解堆積問題。
6. 監(jiān)控和預(yù)警
使用 Kafka 的監(jiān)控工具(如 Prometheus + Grafana、Kafka Manager、Confluent Control Center 等)持續(xù)監(jiān)控 Kafka 集群的狀態(tài),包括消息堆積量、消費(fèi)者延遲、分區(qū)的負(fù)載情況等。設(shè)置適當(dāng)?shù)念A(yù)警機(jī)制,在發(fā)現(xiàn)消息堆積超過閾值時(shí)能夠及時(shí)采取措施,避免堆積問題惡化。
7. 流量控制(Throttling)
在某些情況下,消息堆積可能是因?yàn)樯a(chǎn)者發(fā)送數(shù)據(jù)過快,超過了消費(fèi)者的處理能力??梢酝ㄟ^對(duì)生產(chǎn)者進(jìn)行 流量控制(Throttling) 來限制其發(fā)送速度,確保生產(chǎn)和消費(fèi)的平衡。例如,通過設(shè)置 Kafka 的生產(chǎn)者限流參數(shù)(如 max.in.flight.requests.per.connection)或引入限流中間件,控制流量速率,防止產(chǎn)生大量的積壓。
總結(jié) Kafka 解決消息堆積問題需要從多個(gè)角度入手:
增加消費(fèi)者和分區(qū)的數(shù)量以提高并行處理能力。優(yōu)化消費(fèi)者的消費(fèi)邏輯、批量消費(fèi)和異步處理。擴(kuò)展 Kafka 集群或增加硬件資源,提高系統(tǒng)的吞吐量。調(diào)整生產(chǎn)者和副本同步的配置,優(yōu)化發(fā)送策略。持續(xù)監(jiān)控 Kafka 集群的狀態(tài),及時(shí)預(yù)警和采取措施。
通過這些方法,可以有效解決 Kafka 中的消息堆積問題,確保系統(tǒng)能夠穩(wěn)定、高效地處理大量數(shù)據(jù)流。
6. 提高生產(chǎn)者的發(fā)送效率
批量發(fā)送:通過配置生產(chǎn)者的 batch.size 和 linger.ms,生產(chǎn)者可以在內(nèi)存中批量聚合消息,達(dá)到一定數(shù)量或等待指定時(shí)間后再統(tǒng)一發(fā)送。這樣可以減少網(wǎng)絡(luò)請(qǐng)求的頻率,提升發(fā)送效率。壓縮消息:?jiǎn)⒂蒙a(chǎn)者的消息壓縮(如 Snappy、Gzip 或 LZ4),減少每次發(fā)送的數(shù)據(jù)量,提升傳輸效率。異步發(fā)送:使用異步發(fā)送的方式,減少生產(chǎn)者阻塞,提升整體吞吐量。
7. 漏消費(fèi)和重復(fù)消費(fèi)
1. 漏消費(fèi)
Kafka 保證至少一次的消息傳遞,不會(huì)漏掉消息,但邏輯處理可能漏掉
如果自動(dòng)提交 offset 的頻率過高,消費(fèi)者還沒有完成消息處理就已經(jīng)提交了最新的 offset。一旦消費(fèi)者在處理消息時(shí)發(fā)生錯(cuò)誤或崩潰,這些已經(jīng)被提交的消息可能不會(huì)被再次處理,這會(huì)給人一種“漏消費(fèi)”的感覺。事實(shí)上,這屬于邏輯處理失敗而非 Kafka 消息傳遞問題。
如何避免這個(gè)問題
手動(dòng)提交 offset:
為了更精準(zhǔn)控制 offset 提交,可以關(guān)閉自動(dòng)提交(enable.auto.commit=false),并在消息處理完成后手動(dòng)提交 offset。這樣可以確保只有在消息成功處理后,Kafka 才更新消費(fèi)進(jìn)度。
事務(wù)性消費(fèi):
如果需要確保消息的處理和 offset 提交是一個(gè)原子操作,Kafka 提供了事務(wù)性的生產(chǎn)和消費(fèi)支持,可以確保消費(fèi)和處理的整個(gè)過程是一致的,從而防止因?yàn)楣收蠈?dǎo)致的處理邏輯錯(cuò)誤或漏處理問題。
2. 重復(fù)消費(fèi)
消費(fèi)者在消費(fèi)完一批消息后提交了 offset,但提交操作失敗導(dǎo)致 Kafka 未正確記錄提交的位移。消費(fèi)者重啟,Kafka 會(huì)從上一個(gè)已提交的 offset 開始重新消費(fèi),這就會(huì)導(dǎo)致已經(jīng)處理過的消息再次被消費(fèi)。如果消費(fèi)者在消費(fèi)完消息后遇到網(wǎng)絡(luò)中斷或者消費(fèi)者故障,消息處理可能已經(jīng)成功,但消費(fèi)者未能提交 offset,從而導(dǎo)致在重啟或故障恢復(fù)后從上次已提交的位置重新開始,導(dǎo)致已經(jīng)處理過的消息被重新消費(fèi)。當(dāng)消費(fèi)者組中的消費(fèi)者數(shù)量發(fā)生變化(例如,加入或移除消費(fèi)者)時(shí),Kafka 會(huì)觸發(fā)分區(qū)重新平衡,將分區(qū)重新分配給消費(fèi)者。如果重新平衡發(fā)生時(shí),有些消費(fèi)者尚未提交處理的 offset,新的消費(fèi)者可能會(huì)從上一次提交的 offset 開始重新消費(fèi),導(dǎo)致重復(fù)消費(fèi)。
解決方法
確保冪等性:為了解決重復(fù)消費(fèi)問題,可以確保消費(fèi)者的業(yè)務(wù)邏輯具有冪等性,即相同的消息處理多次不會(huì)影響結(jié)果。通過記錄已處理的消息 ID 或使用外部事務(wù)機(jī)制,可以防止重復(fù)處理的副作用。事務(wù)支持:Kafka 提供了事務(wù)性生產(chǎn)者和消費(fèi)者的功能,確保消息的生產(chǎn)和消費(fèi)可以在一個(gè)事務(wù)中完成,從而避免重復(fù)消費(fèi)的情況發(fā)生。
8. 至少一次的消息傳遞
1. 故障恢復(fù)
消費(fèi)者故障恢復(fù):如果消費(fèi)者在處理消息時(shí)崩潰,Kafka 會(huì)在下次啟動(dòng)時(shí)從上一次已提交的 offset 開始重新消費(fèi),從而確保不會(huì)漏掉任何消息。分區(qū)重新平衡:在消費(fèi)者組中,當(dāng)消費(fèi)者數(shù)量發(fā)生變化時(shí),Kafka 會(huì)進(jìn)行分區(qū)重新平衡,確保所有分區(qū)都有消費(fèi)者處理。
Kafka 通過持久化存儲(chǔ)、消息確認(rèn)機(jī)制、消費(fèi)者的 offset 提交、分區(qū)與副本機(jī)制、冪等性以及故障恢復(fù)等手段,確保實(shí)現(xiàn)至少一次的消息傳遞語義。這種設(shè)計(jì)使得 Kafka 在高可用性和高吞吐量的同時(shí),依然能夠保證消息的可靠性。
9. 消費(fèi)者組
在 Kafka 中引入消費(fèi)者組(Consumer Group)的設(shè)計(jì)是為了增強(qiáng)系統(tǒng)的并行消費(fèi)能力和消息處理的靈活性。消費(fèi)者組是 Kafka 中消費(fèi)者架構(gòu)的核心,幫助系統(tǒng)實(shí)現(xiàn)橫向擴(kuò)展和消息處理的負(fù)載均衡。以下是消費(fèi)者組存在的主要原因:
1. 實(shí)現(xiàn)并行消費(fèi),提高消費(fèi)效率
Kafka 的每個(gè)主題(topic)可以有多個(gè)分區(qū)(partition),而每個(gè)分區(qū)只能被一個(gè)消費(fèi)者消費(fèi)(在同一個(gè)消費(fèi)者組內(nèi))。如果沒有消費(fèi)者組,單個(gè)消費(fèi)者必須從所有分區(qū)中消費(fèi)消息,這會(huì)導(dǎo)致消費(fèi)速度瓶頸。
通過消費(fèi)者組,Kafka 可以將分區(qū)分配給組內(nèi)不同的消費(fèi)者,這樣每個(gè)消費(fèi)者可以并行處理不同的分區(qū)的數(shù)據(jù),從而提升消費(fèi)的效率。
一個(gè)消費(fèi)者組中的多個(gè)消費(fèi)者可以同時(shí)處理多個(gè)分區(qū)的數(shù)據(jù)。這提供了良好的橫向擴(kuò)展性,隨著消息量的增加,你可以通過增加更多消費(fèi)者來提升系統(tǒng)的處理能力。
2. 負(fù)載均衡和故障轉(zhuǎn)移
消費(fèi)者組可以幫助 Kafka 實(shí)現(xiàn)負(fù)載均衡,即在一個(gè)消費(fèi)者組中的消費(fèi)者可以自動(dòng)分配和重新分配分區(qū)。
自動(dòng)負(fù)載均衡:Kafka 會(huì)將分區(qū)分配給組中的消費(fèi)者。如果有新的消費(fèi)者加入組,Kafka 會(huì)進(jìn)行重新分配(Rebalance),將現(xiàn)有的分區(qū)重新分配給更多消費(fèi)者,以便更均勻地分擔(dān)消費(fèi)負(fù)擔(dān)。容錯(cuò)和故障轉(zhuǎn)移:如果某個(gè)消費(fèi)者發(fā)生故障或退出組,Kafka 會(huì)自動(dòng)將該消費(fèi)者負(fù)責(zé)的分區(qū)分配給組內(nèi)其他消費(fèi)者,確保系統(tǒng)能夠繼續(xù)正常消費(fèi)。這使得 Kafka 系統(tǒng)具備了高容錯(cuò)性,不會(huì)因?yàn)槟硞€(gè)消費(fèi)者的故障而停止整個(gè)消費(fèi)過程。
3. 消息的分組消費(fèi)(保證消息的唯一消費(fèi))
消費(fèi)者組確保每條消息在一個(gè)消費(fèi)者組內(nèi)只被消費(fèi)一次(即每個(gè)分區(qū)的消息只由一個(gè)消費(fèi)者處理)。這與 Kafka 的至少一次語義相結(jié)合,能夠保證同一組內(nèi)不會(huì)有多個(gè)消費(fèi)者重復(fù)消費(fèi)同一個(gè)分區(qū)的消息。
獨(dú)占消費(fèi):Kafka 通過消費(fèi)者組機(jī)制保證每個(gè)分區(qū)的數(shù)據(jù)只會(huì)被該組中的一個(gè)消費(fèi)者處理。對(duì)于多個(gè)消費(fèi)者的場(chǎng)景,它避免了消息被重復(fù)消費(fèi),確保消息處理的唯一性。
4. 消息的廣播消費(fèi)
消費(fèi)者組還支持不同的消費(fèi)者組獨(dú)立消費(fèi)相同的消息流。例如:
一個(gè)主題(topic)中的消息可以被多個(gè)不同的消費(fèi)者組訂閱。每個(gè)組中的消費(fèi)者獨(dú)立消費(fèi)同一個(gè)主題的消息,互不干擾。這意味著多個(gè)獨(dú)立的系統(tǒng)或服務(wù)可以并行地從同一個(gè) Kafka 主題中消費(fèi)消息,而每個(gè)系統(tǒng)只消費(fèi)一次消息副本。這對(duì)于不同的應(yīng)用場(chǎng)景非常有用,例如一個(gè)消費(fèi)者組處理訂單,另一個(gè)消費(fèi)者組處理日志。
5. 擴(kuò)展性和靈活性
通過消費(fèi)者組,Kafka 提供了高度的擴(kuò)展性和靈活性:
可擴(kuò)展消費(fèi)能力:可以隨時(shí)增加消費(fèi)者到組中以提升吞吐量,或在流量減少時(shí)減少消費(fèi)者以節(jié)省資源。靈活的應(yīng)用場(chǎng)景:不同的消費(fèi)者組可以訂閱相同的主題,執(zhí)行不同的處理邏輯,允許數(shù)據(jù)在多種不同的應(yīng)用中被獨(dú)立處理。
總結(jié)
消費(fèi)者組在 Kafka 中的存在帶來了以下好處:
允許并行處理消息,提升消費(fèi)效率。實(shí)現(xiàn)消費(fèi)者間的負(fù)載均衡和故障轉(zhuǎn)移,保證系統(tǒng)的高可用性。確保在同一組中每條消息只被消費(fèi)一次,避免重復(fù)消費(fèi)。支持多組消費(fèi)者獨(dú)立消費(fèi)相同的消息流,靈活適應(yīng)不同的業(yè)務(wù)需求。
消費(fèi)者組的設(shè)計(jì)使 Kafka 能夠以高吞吐量和容錯(cuò)的方式支持各種規(guī)模的實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)。
10. 生產(chǎn)者分區(qū)策略
Kafka 中的生產(chǎn)者分區(qū)策略(Partitioning Strategy)決定了消息將被發(fā)送到哪個(gè)分區(qū)。分區(qū)策略直接影響消息的分布、并行處理能力和消息順序性。Kafka 生產(chǎn)者在發(fā)送消息時(shí),會(huì)根據(jù)配置和策略選擇適當(dāng)?shù)姆謪^(qū)。以下是 Kafka 常見的生產(chǎn)者分區(qū)策略:
1. 輪詢策略(Round-Robin)
策略描述:生產(chǎn)者將消息均勻地分發(fā)到所有可用分區(qū)上,按照輪詢(循環(huán))的方式發(fā)送。特點(diǎn):這種方式?jīng)]有考慮消息的內(nèi)容,只是簡(jiǎn)單地將消息均勻分布到不同分區(qū)。適用場(chǎng)景:適用于對(duì)消息順序性沒有嚴(yán)格要求的場(chǎng)景,確保各個(gè)分區(qū)負(fù)載均衡。 示例:如果一個(gè)主題有 3 個(gè)分區(qū),生產(chǎn)者按照輪詢策略依次將消息發(fā)送到分區(qū) 0、分區(qū) 1、分區(qū) 2,接著再返回分區(qū) 0。
2. 基于消息鍵的分區(qū)策略(Key-based Partitioning)
策略描述:當(dāng)消息包含鍵(key)時(shí),生產(chǎn)者會(huì)根據(jù)鍵值來決定消息發(fā)送到哪個(gè)分區(qū)。具體實(shí)現(xiàn)中,Kafka 會(huì)使用消息鍵的哈希值來選擇分區(qū):partition = hash(key) % number_of_partitions。特點(diǎn):基于相同鍵的消息會(huì)被發(fā)送到同一個(gè)分區(qū),從而確保這些消息在該分區(qū)內(nèi)保持順序。不同鍵的消息會(huì)分散到不同的分區(qū)。適用場(chǎng)景:適合需要按鍵值保持消息順序的場(chǎng)景,確保同一鍵值的消息總是發(fā)送到相同的分區(qū)。 示例:如果一個(gè)用戶 ID 被作為消息鍵,所有與該用戶相關(guān)的消息會(huì)被發(fā)送到同一個(gè)分區(qū)。這在需要保證用戶級(jí)別的順序性時(shí)非常有用。
3. 自定義分區(qū)策略(Custom Partitioning Strategy)
策略描述:Kafka 允許用戶實(shí)現(xiàn)自己的分區(qū)策略,即可以通過實(shí)現(xiàn) Kafka 的 Partitioner 接口來自定義分區(qū)選擇邏輯。特點(diǎn):靈活性最高,用戶可以根據(jù)特定的業(yè)務(wù)需求實(shí)現(xiàn)復(fù)雜的分區(qū)邏輯。適用場(chǎng)景:適用于需要根據(jù)特定業(yè)務(wù)邏輯進(jìn)行分區(qū)選擇的場(chǎng)景。例如,按照數(shù)據(jù)時(shí)間戳、區(qū)域、優(yōu)先級(jí)等來分區(qū)。 示例:可以自定義一個(gè)分區(qū)器,將高優(yōu)先級(jí)的消息發(fā)送到特定的分區(qū),而低優(yōu)先級(jí)的消息分布到其他分區(qū)。
4. 粘性分區(qū)策略(Sticky Partitioning)
策略描述:Kafka 從 2.4.0 開始引入了一種新的默認(rèn)分區(qū)策略,稱為粘性分區(qū)器。在此策略下,生產(chǎn)者會(huì)在一段時(shí)間內(nèi)將所有消息發(fā)送到同一個(gè)分區(qū),直到該分區(qū)的批次被填滿或達(dá)到發(fā)送限制,然后再選擇一個(gè)新的分區(qū)。特點(diǎn):這種策略優(yōu)化了吞吐量和延遲,因?yàn)樗鼫p少了批次發(fā)送的頻率。批量發(fā)送大量消息可以提高網(wǎng)絡(luò)效率,同時(shí)減少 CPU 和內(nèi)存的消耗。適用場(chǎng)景:適用于大規(guī)模批量發(fā)送消息的場(chǎng)景,主要目的是提高吞吐量。 示例:生產(chǎn)者連續(xù)發(fā)送多條消息,直到填滿一個(gè)批次,之后再切換到下一個(gè)分區(qū)進(jìn)行發(fā)送,而不是每條消息都選擇一個(gè)新的分區(qū)。
5. 指定分區(qū)(Manual Partitioning)
策略描述:生產(chǎn)者在發(fā)送消息時(shí)可以明確指定要發(fā)送的分區(qū)。這樣,生產(chǎn)者直接控制消息進(jìn)入的分區(qū),不依賴于 Kafka 內(nèi)置的分區(qū)策略。特點(diǎn):完全由生產(chǎn)者控制消息發(fā)送的分區(qū),適用于對(duì)消息分區(qū)有特殊需求的場(chǎng)景。適用場(chǎng)景:當(dāng)生產(chǎn)者有明確的分區(qū)需求,比如根據(jù)特定業(yè)務(wù)邏輯決定哪個(gè)分區(qū)處理哪些消息時(shí),可以使用該策略。
6. 默認(rèn)分區(qū)策略(Default Partitioning)
策略描述:如果消息沒有帶鍵且生產(chǎn)者沒有顯式指定分區(qū),Kafka 使用默認(rèn)的分區(qū)策略(Kafka 2.4 之前是輪詢分區(qū),Kafka 2.4 之后是粘性分區(qū))。特點(diǎn):適用于沒有特殊分區(qū)要求的場(chǎng)景,如果未指定分區(qū)或鍵,Kafka 會(huì)采用一種默認(rèn)的分區(qū)策略分配消息。適用場(chǎng)景:生產(chǎn)者不關(guān)心消息的分區(qū)分配,且沒有特定分區(qū)策略的場(chǎng)景下,使用默認(rèn)策略即可。
7. 隨機(jī)分區(qū)策略(Random Partitioning)
Kafka 默認(rèn)分區(qū)策略不包含“隨機(jī)分區(qū)策略”,但可以通過自定義分區(qū)器來實(shí)現(xiàn)隨機(jī)分區(qū)。生產(chǎn)者的默認(rèn)策略不會(huì)隨機(jī)選擇分區(qū),而是依賴于鍵的哈希值或使用輪詢/粘性策略來分配分區(qū)。
11. 批次的概念
在 Kafka 中,生產(chǎn)者發(fā)送消息時(shí)會(huì)使用 批次(batch)的概念來優(yōu)化網(wǎng)絡(luò)傳輸和吞吐量。
批次機(jī)制允許生產(chǎn)者在發(fā)送消息時(shí),將多條消息聚合在一起,作為一個(gè)整體(批次)發(fā)送到 Kafka 的 Broker。這不僅提高了網(wǎng)絡(luò)傳輸?shù)男?,還能減少單條消息發(fā)送的開銷。
Kafka 的生產(chǎn)者會(huì)在內(nèi)存中累積消息,并在滿足一定條件時(shí),將消息作為一個(gè)批次發(fā)往目標(biāo)分區(qū)。每個(gè)批次中的消息會(huì)發(fā)送到同一個(gè)分區(qū)。
批次的觸發(fā)條件
通過配置參數(shù) batch.size 來設(shè)置每個(gè)批次的最大大小。它表示生產(chǎn)者內(nèi)存緩沖區(qū)中一個(gè)批次能夠容納的字節(jié)數(shù)。例如,batch.size 被設(shè)置為 16KB,那么生產(chǎn)者會(huì)在消息總大小達(dá)到 16KB 時(shí),將消息批量發(fā)送到指定的分區(qū)。除了批次大小,Kafka 還提供了一個(gè)參數(shù) linger.ms,它定義了生產(chǎn)者等待新消息加入批次的時(shí)間。即使批次的大小沒有達(dá)到 batch.size,當(dāng)?shù)却龝r(shí)間超過 linger.ms 的設(shè)定值時(shí),生產(chǎn)者也會(huì)將當(dāng)前累積的消息發(fā)送出去。這樣做的好處是,即使消息量較小,生產(chǎn)者也不會(huì)一直等待,能夠確保消息及時(shí)發(fā)送。例如:如果 linger.ms 設(shè)置為 5ms,生產(chǎn)者會(huì)在等待 5ms 后,無論消息是否填滿當(dāng)前批次,都會(huì)發(fā)送這些消息。如果 linger.ms 設(shè)置為 0ms(默認(rèn)值),生產(chǎn)者會(huì)盡可能快速地發(fā)送消息,而不會(huì)故意等待新的消息加入批次。
優(yōu)化參數(shù)
batch.size 和 linger.ms 是生產(chǎn)者性能優(yōu)化的重要參數(shù)。根據(jù)場(chǎng)景需求,生產(chǎn)者可以調(diào)整這些參數(shù)以找到合適的平衡:
如果吞吐量是關(guān)鍵,可以增加 batch.size 以容納更多消息,同時(shí)設(shè)置一個(gè)較大的 linger.ms 讓生產(chǎn)者有足夠時(shí)間批量發(fā)送。如果延遲要求較高(需要盡快發(fā)送消息),可以將 linger.ms 設(shè)置較小,甚至為 0,讓生產(chǎn)者盡量實(shí)時(shí)發(fā)送消息。
批次的發(fā)送流程
當(dāng)生產(chǎn)者發(fā)送消息時(shí),會(huì)將消息先寫入生產(chǎn)者內(nèi)存緩沖區(qū)(buffer.memory)。在達(dá)到 batch.size 或等待時(shí)間 linger.ms 過期時(shí),生產(chǎn)者會(huì)將這些消息打包為一個(gè)批次,發(fā)送到相應(yīng)的分區(qū)。如果緩沖區(qū)中的數(shù)據(jù)沒有達(dá)到 batch.size,但 linger.ms 已經(jīng)到期,生產(chǎn)者也會(huì)強(qiáng)制將未滿的批次發(fā)送出去。
總結(jié)
Kafka 發(fā)送消息時(shí),批次(batch)是為了提高網(wǎng)絡(luò)效率,將多條消息聚合在一起作為一個(gè)批次發(fā)送。批次可以通過兩種方式觸發(fā):
批次大小達(dá)到設(shè)定值(batch.size)。等待時(shí)間超過設(shè)定的超時(shí)時(shí)間(linger.ms)。
這種機(jī)制有效地提高了吞吐量和網(wǎng)絡(luò)資源的利用率,同時(shí)在低延遲和高吞吐量之間提供了靈活的權(quán)衡選擇。
12. 為什么同一個(gè)分區(qū)只能由消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)
在 Kafka 中,同一個(gè)分區(qū)只能由同一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者進(jìn)行消費(fèi),主要是為了保證消息的順序性和消費(fèi)狀態(tài)的一致性。這是 Kafka 消費(fèi)模型設(shè)計(jì)的核心理念之一。
1. 消息的順序性
分區(qū)內(nèi)消息的順序:Kafka 中消息的順序是在分區(qū)級(jí)別保證的。在一個(gè)分區(qū)內(nèi),消息按照生產(chǎn)者發(fā)送的順序依次存儲(chǔ),后續(xù)的消費(fèi)者會(huì)按照相同的順序讀取消息。如果允許同一個(gè)分區(qū)被多個(gè)消費(fèi)者組內(nèi)的消費(fèi)者并行消費(fèi),那么多個(gè)消費(fèi)者會(huì)同時(shí)從該分區(qū)拉取消息,導(dǎo)致無法確保這些消息按生產(chǎn)順序被處理。例如,如果兩條消息 M1 和 M2 被不同的消費(fèi)者同時(shí)消費(fèi),M2 可能在 M1 之前被處理,這會(huì)打亂分區(qū)內(nèi)消息的順序,進(jìn)而違背 Kafka 對(duì)消息順序的承諾。因此,為了保持分區(qū)內(nèi)消息處理的順序,Kafka 設(shè)計(jì)了同一分區(qū)只能由消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)的規(guī)則。這里也可以看出在 kafka 中消息的順序保證在同一個(gè)消費(fèi)者組和同一個(gè)分區(qū)的前提下。
2. 消費(fèi)狀態(tài)的一致性
Offset(偏移量)追蹤:Kafka 中,消費(fèi)者在消費(fèi)分區(qū)中的消息時(shí),會(huì)記錄已經(jīng)消費(fèi)的消息位置,稱為偏移量(offset)。Kafka 依賴消費(fèi)者提交的偏移量來管理消費(fèi)進(jìn)度。如果同一個(gè)分區(qū)允許被多個(gè)消費(fèi)者組中的消費(fèi)者同時(shí)消費(fèi),它們會(huì)同時(shí)更新和提交各自的偏移量,這樣 Kafka 會(huì)面臨追蹤每個(gè)消費(fèi)者消費(fèi)進(jìn)度的一致性問題。不同消費(fèi)者提交的 offset 可能會(huì)導(dǎo)致消息重復(fù)消費(fèi)或漏消費(fèi),無法有效管理消費(fèi)狀態(tài)。通過讓同一分區(qū)只能由消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi),Kafka 能確保這個(gè)消費(fèi)者是該分區(qū)消費(fèi)進(jìn)度的唯一維護(hù)者,避免了偏移量混亂的問題,保證了消費(fèi)進(jìn)度的清晰和一致。
3. 消費(fèi)者組的負(fù)載均衡
Kafka 通過消費(fèi)者組實(shí)現(xiàn)了分區(qū)的負(fù)載均衡。一個(gè)消費(fèi)者組可以包含多個(gè)消費(fèi)者,每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)一部分分區(qū)。分區(qū)的分配方式為:每個(gè)消費(fèi)者組中的消費(fèi)者獨(dú)占一個(gè)或多個(gè)分區(qū)進(jìn)行消費(fèi)。如果允許同一個(gè)分區(qū)被同一個(gè)消費(fèi)者組中的多個(gè)消費(fèi)者并行消費(fèi),會(huì)打破這種負(fù)載均衡機(jī)制,增加分區(qū)的重復(fù)處理,降低系統(tǒng)效率。消費(fèi)者組機(jī)制通過確保每個(gè)分區(qū)只分配給一個(gè)消費(fèi)者,可以將分區(qū)均勻分配到消費(fèi)者,最大化利用消費(fèi)者資源,并確保分區(qū)之間的消息處理不重疊。
4. 消息處理的冪等性和一致性
在某些場(chǎng)景下,Kafka 的消費(fèi)者端需要確保消息處理的冪等性,即每條消息只被處理一次。如果允許同一個(gè)分區(qū)被多個(gè)消費(fèi)者同時(shí)消費(fèi),可能會(huì)導(dǎo)致同一條消息被處理多次,進(jìn)而導(dǎo)致數(shù)據(jù)不一致或系統(tǒng)故障。為了避免這些問題,Kafka 保證了每個(gè)分區(qū)只能被消費(fèi)者組內(nèi)的一個(gè)消費(fèi)者處理,從而確保每條消息只被消費(fèi)一次。
5. 消費(fèi)模型的清晰性
Kafka 的消費(fèi)模型設(shè)計(jì)為分區(qū)-消費(fèi)者映射,這種設(shè)計(jì)簡(jiǎn)化了分區(qū)管理的復(fù)雜度。每個(gè)消費(fèi)者負(fù)責(zé)獨(dú)立的分區(qū),消費(fèi)者之間沒有競(jìng)爭(zhēng)關(guān)系,能夠避免對(duì)同一分區(qū)的競(jìng)爭(zhēng)消費(fèi)和消息重復(fù)處理的問題。通過這一設(shè)計(jì),Kafka 能夠提供一種非常清晰的消費(fèi)模型:每個(gè)分區(qū)有唯一的消費(fèi)者負(fù)責(zé),消息消費(fèi)的狀態(tài)(如 offset 提交)和順序都得到嚴(yán)格控制。
總結(jié)
在 Kafka 中,同一個(gè)分區(qū)只能由消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi),是為了實(shí)現(xiàn)以下幾個(gè)關(guān)鍵目標(biāo):
保持消息的順序性:分區(qū)內(nèi)的消息必須按照生產(chǎn)者的順序被消費(fèi)者消費(fèi),確保應(yīng)用邏輯的正確性。保證消費(fèi)進(jìn)度的一致性:防止多個(gè)消費(fèi)者對(duì)同一分區(qū)的 offset 進(jìn)行競(jìng)爭(zhēng)性提交,避免重復(fù)或漏消費(fèi)問題。支持負(fù)載均衡:消費(fèi)者組實(shí)現(xiàn)了分區(qū)的負(fù)載均衡,確保消費(fèi)者組內(nèi)的消費(fèi)者能夠高效分配任務(wù)。簡(jiǎn)化消費(fèi)模型:清晰、簡(jiǎn)潔的分區(qū)-消費(fèi)者映射規(guī)則簡(jiǎn)化了 Kafka 的消費(fèi)管理。
通過這種設(shè)計(jì),Kafka 能夠在消息順序、系統(tǒng)效率和一致性之間取得平衡,使得它成為高性能、可靠的消息傳遞系統(tǒng)。
13. 主副本機(jī)制
Kafka中副本分為:Leader 和 Follower。生產(chǎn)者只會(huì)把數(shù)據(jù)發(fā)往 Leader,然后 Follower 找 Leader 進(jìn)行同步數(shù)據(jù)。分區(qū)中的所有副本統(tǒng)稱為 AR(Assigned Repllicas)和 Leader 保持同步的 Follower 集合叫做ISR。如果 Follower 長(zhǎng)時(shí)間未向 Leader 發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該 Follower 將被踢出 ISR。該時(shí)間閾值由 replica.lag.time.max.ms參數(shù)設(shè)定,默認(rèn) 30s。 副本一旦重新恢復(fù)并追上主副本的數(shù)據(jù)進(jìn)度,它可以被重新加入 ISR 集合。Leader 發(fā)生故障之后,就會(huì)從 ISR 中選舉新的 Leader。選舉規(guī)則為在ISR中存活為前提,按照AR中排在前面的優(yōu)先。OSR:表示 Follower 與 Leader 副本同步時(shí),延遲過多的副本AR = ISR + OSR
14. 同一個(gè)分區(qū)被不同消費(fèi)者組中的消費(fèi)者同時(shí)消費(fèi)
在 Kafka 中,同一個(gè)分區(qū)可以被不同消費(fèi)者組中的消費(fèi)者同時(shí)消費(fèi)。
1. 消費(fèi)者組的概念
消費(fèi)者組是 Kafka 中的一種邏輯概念,表示一組共同消費(fèi)某個(gè)或某些主題的消費(fèi)者。每個(gè)消費(fèi)者組會(huì)維護(hù)一個(gè)獨(dú)立的消費(fèi)進(jìn)度(offset),以便能夠獨(dú)立地處理消息。
2. 同一分區(qū)的消費(fèi)
在 Kafka 中,同一個(gè)主題的每個(gè)分區(qū)只能被同一消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。這意味著,若某個(gè)消費(fèi)者組中的消費(fèi)者正在消費(fèi)一個(gè)分區(qū),那么該分區(qū)的消息不會(huì)被該消費(fèi)者組中的其他消費(fèi)者消費(fèi)。然而,不同的消費(fèi)者組可以同時(shí)消費(fèi)同一個(gè)分區(qū)的消息。這使得不同的應(yīng)用程序或服務(wù)可以獨(dú)立地處理相同的消息流,從而滿足不同的業(yè)務(wù)需求。
3. 工作機(jī)制
消息分發(fā):當(dāng)消息發(fā)布到一個(gè)主題的某個(gè)分區(qū)時(shí),Kafka 會(huì)將該消息傳遞給正在消費(fèi)該分區(qū)的消費(fèi)者組中的一個(gè)消費(fèi)者。如果另一個(gè)消費(fèi)者組也在消費(fèi)同一主題的相同分區(qū),則該消息也會(huì)被傳遞給這個(gè)消費(fèi)者組的消費(fèi)者。并行處理:這種設(shè)計(jì)使得可以有多個(gè)消費(fèi)者組并行處理相同的消息流。例如,一個(gè)消費(fèi)者組可以用于實(shí)時(shí)數(shù)據(jù)處理,而另一個(gè)消費(fèi)者組可以用于數(shù)據(jù)持久化或批處理。
4. 示例場(chǎng)景
實(shí)時(shí)處理:例如,假設(shè)有一個(gè)消息主題 orders,該主題有多個(gè)消費(fèi)者組:
消費(fèi)者組 A 可能用于實(shí)時(shí)訂單處理,消費(fèi)訂單消息進(jìn)行交易決策。消費(fèi)者組 B 可能用于分析訂單數(shù)據(jù),消費(fèi)相同的訂單消息進(jìn)行統(tǒng)計(jì)分析。 在這種情況下,兩個(gè)消費(fèi)者組可以獨(dú)立地處理相同的訂單消息,滿足不同的業(yè)務(wù)需求。
總結(jié) 在 Kafka 中,同一個(gè)分區(qū)可以被不同消費(fèi)者組中的消費(fèi)者同時(shí)消費(fèi)。這種設(shè)計(jì)使得 Kafka 能夠靈活地滿足多種應(yīng)用場(chǎng)景,允許多個(gè)消費(fèi)者組獨(dú)立消費(fèi)相同的消息流,從而實(shí)現(xiàn)高效的數(shù)據(jù)處理和分析。
15. 冪等性
在 Kafka 中,冪等性(Idempotence)是指生產(chǎn)者在發(fā)送消息時(shí),即使由于網(wǎng)絡(luò)問題或其他原因?qū)е孪⒅貜?fù)發(fā)送,Kafka 也能確保這些重復(fù)消息只被處理一次,不會(huì)對(duì)數(shù)據(jù)造成影響。
1. 冪等生產(chǎn)者的設(shè)計(jì) Kafka 引入了冪等生產(chǎn)者的概念,允許生產(chǎn)者在發(fā)送消息時(shí)標(biāo)識(shí)每條消息的唯一性。具體而言,冪等性生產(chǎn)者具有以下特性:
生產(chǎn)者 ID(Producer ID):每個(gè)冪等生產(chǎn)者在創(chuàng)建時(shí)會(huì)被分配一個(gè)唯一的生產(chǎn)者 ID。序列號(hào)(Sequence Number):每個(gè)生產(chǎn)者在發(fā)送每條消息時(shí)會(huì)為該消息分配一個(gè)遞增的序列號(hào)。序列號(hào)的增加是與生產(chǎn)者 ID 相關(guān)聯(lián)的。
2. 確保消息唯一性
在生產(chǎn)者發(fā)送消息時(shí),Kafka 會(huì)使用生產(chǎn)者 ID 和序列號(hào)來確保消息的唯一性。具體流程如下:
發(fā)送消息:生產(chǎn)者發(fā)送消息到 Kafka 分區(qū)時(shí),會(huì)附帶其生產(chǎn)者 ID 和當(dāng)前的序列號(hào)。服務(wù)器端處理:Kafka 服務(wù)器接收到消息后,會(huì)檢查消息的生產(chǎn)者 ID 和序列號(hào)。
如果該生產(chǎn)者 ID 和序列號(hào)組合在該分區(qū)中未出現(xiàn)過,Kafka 會(huì)將消息寫入日志。如果該組合已存在,則表示該消息是重復(fù)發(fā)送的,Kafka 將忽略這條消息,不會(huì)再次寫入。
3. 開啟冪等性特性 要啟用冪等性,生產(chǎn)者在配置時(shí)需要將 enable.idempotence 參數(shù)設(shè)置為 true。啟用后,生產(chǎn)者會(huì)自動(dòng)處理冪等性相關(guān)的配置,如:
自動(dòng)設(shè)置合適的 acks(一般設(shè)置為 all)以確保消息寫入成功。確保生產(chǎn)者在每次發(fā)送消息時(shí)不會(huì)有重試次數(shù)的限制,保證序列號(hào)的遞增。
4. 生產(chǎn)者重試機(jī)制 Kafka 的冪等性還與生產(chǎn)者的重試機(jī)制有關(guān)。當(dāng)生產(chǎn)者發(fā)送消息時(shí),如果未收到確認(rèn)(例如,由于網(wǎng)絡(luò)中斷),它可能會(huì)重試發(fā)送。這種情況下,生產(chǎn)者仍然能夠保持冪等性,因?yàn)椋?/p>
每次重試都會(huì)使用相同的生產(chǎn)者 ID 和序列號(hào),Kafka 服務(wù)器能夠識(shí)別出這些消息是重復(fù)的,因此不會(huì)重復(fù)寫入。這樣,即使由于重試導(dǎo)致同一消息被多次發(fā)送,最終只會(huì)保留一次寫入。
5. 整體的消費(fèi)冪等性 除了生產(chǎn)者的冪等性外,消費(fèi)者處理消息時(shí)的冪等性也很重要。一般來說,消費(fèi)者應(yīng)該在處理消息時(shí)實(shí)現(xiàn)冪等性,例如在數(shù)據(jù)庫中執(zhí)行插入操作時(shí)使用唯一鍵,確保即使同一消息被處理多次,也不會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。
6. 重啟
在 Kafka 中,重啟并不會(huì)導(dǎo)致冪等性喪失,但需要正確管理生產(chǎn)者的狀態(tài)。
狀態(tài)持久化:當(dāng)冪等生產(chǎn)者被重啟時(shí),它會(huì)嘗試恢復(fù)之前的狀態(tài)(即生產(chǎn)者 ID 和最新的序列號(hào))。如果生產(chǎn)者之前的狀態(tài)被保存并在重啟后能夠恢復(fù),冪等性就仍然可以得到保障。啟用冪等性:在重啟后,生產(chǎn)者需要繼續(xù)配置 enable.idempotence=true。如果該設(shè)置丟失,冪等性保障將不再有效。生產(chǎn)者 ID 恢復(fù):當(dāng)冪等生產(chǎn)者重啟時(shí),它必須能夠重新獲取其生產(chǎn)者 ID。Kafka 會(huì)在內(nèi)部管理這個(gè) ID 的分配,確保其在同一進(jìn)程中是唯一的。網(wǎng)絡(luò)分區(qū):在某些情況下,網(wǎng)絡(luò)分區(qū)可能導(dǎo)致部分消息丟失或未被確認(rèn)。在這種情況下,重啟后,生產(chǎn)者可能會(huì)重新發(fā)送那些未確認(rèn)的消息,導(dǎo)致數(shù)據(jù)重復(fù),但這是因?yàn)橄⒈恢卦?,而不是因?yàn)閮绲刃詥适АO㈨樞颍涸谥貑⑵陂g,生產(chǎn)者可能會(huì)遇到消息順序的變化,特別是在多個(gè)生產(chǎn)者同時(shí)工作時(shí),但這并不會(huì)影響冪等性本身。
小結(jié) Kafka 通過冪等生產(chǎn)者設(shè)計(jì)、使用生產(chǎn)者 ID 和序列號(hào)、配置項(xiàng)的啟用以及重試機(jī)制來實(shí)現(xiàn)消息的冪等性。通過這種方式,Kafka 能夠確保即使在網(wǎng)絡(luò)問題或重試的情況下,消息也不會(huì)重復(fù)處理,保障數(shù)據(jù)的一致性和可靠性。
16. 事務(wù)
1. 事務(wù)性生產(chǎn)
Kafka 事務(wù)性生產(chǎn) 是 Kafka 提供的一種機(jī)制,旨在確保消息的原子性和一致性,使生產(chǎn)者能夠以事務(wù)的方式發(fā)送多條消息。通過事務(wù)性生產(chǎn),生產(chǎn)者可以確保一組消息要么全部成功寫入 Kafka,要么全部失敗,從而避免數(shù)據(jù)的不一致。
1. 基本概念
原子性:在事務(wù)中發(fā)送的所有消息要么全部提交,要么全部回滾,確保不會(huì)出現(xiàn)部分消息已發(fā)送、部分消息未發(fā)送的情況。一致性:在任何時(shí)刻,消費(fèi)者只能看到完整的、已提交的消息狀態(tài),避免看到不完整的數(shù)據(jù)。
2. 關(guān)鍵步驟
初始化事務(wù):
在發(fā)送消息之前,生產(chǎn)者需要調(diào)用 initTransactions() 方法來初始化事務(wù)管理器,為后續(xù)的事務(wù)操作做好準(zhǔn)備。 開始事務(wù):
使用 beginTransaction() 方法開始一個(gè)新的事務(wù)。在此期間,發(fā)送的消息不會(huì)被消費(fèi)者看到。 發(fā)送消息:
在事務(wù)進(jìn)行期間,生產(chǎn)者可以發(fā)送多條消息。即使在消息發(fā)送的過程中發(fā)生了錯(cuò)誤,事務(wù)也可以確保這些消息不會(huì)被永久寫入 Kafka。 提交事務(wù):
調(diào)用 commitTransaction() 方法提交事務(wù)。這將使事務(wù)中的所有消息持久化,消費(fèi)者可以看到這些消息。 中止事務(wù):
如果在發(fā)送消息的過程中發(fā)生了錯(cuò)誤,生產(chǎn)者可以調(diào)用 abortTransaction() 方法中止事務(wù),從而丟棄事務(wù)中的所有消息,確保不會(huì)有部分消息被寫入。
3. 事務(wù)性生產(chǎn)的配置
為了使用事務(wù)性生產(chǎn),生產(chǎn)者需要進(jìn)行以下配置:
transactional.id:必須為生產(chǎn)者設(shè)置一個(gè)唯一的 transactional.id,以標(biāo)識(shí)該生產(chǎn)者。Kafka 會(huì)使用這個(gè) ID 來管理事務(wù)的狀態(tài)。enable.idempotence:應(yīng)設(shè)置為 true,以啟用冪等性,確保即使發(fā)生重試,也不會(huì)導(dǎo)致重復(fù)消息。
4. 事務(wù)的隔離級(jí)別
在使用事務(wù)性生產(chǎn)時(shí),消費(fèi)者可以根據(jù)配置的隔離級(jí)別來控制讀取消息的方式:
read_committed:消費(fèi)者只能讀取已提交的消息,確保數(shù)據(jù)的一致性。read_uncommitted:消費(fèi)者可以讀取所有消息,包括未提交的消息,這可能導(dǎo)致不一致性。
5. 事務(wù)的故障處理
Kafka 能夠處理事務(wù)中的故障情況,以確保數(shù)據(jù)的一致性:
如果生產(chǎn)者在提交事務(wù)之前崩潰,Kafka 會(huì)根據(jù)事務(wù) ID 和狀態(tài)自動(dòng)處理,確保未提交的消息不會(huì)對(duì)消費(fèi)者可見。如果生產(chǎn)者在提交事務(wù)時(shí)遇到錯(cuò)誤,Kafka 會(huì)確保事務(wù)被中止,從而避免數(shù)據(jù)的不一致。
6. 使用場(chǎng)景
事務(wù)性生產(chǎn)適用于以下場(chǎng)景:
跨主題或跨分區(qū)消息處理:當(dāng)需要將多個(gè)主題或多個(gè)分區(qū)的消息作為一個(gè)整體進(jìn)行處理時(shí)。事件流處理:在事件流處理和數(shù)據(jù)管道中,確保數(shù)據(jù)的一致性和可靠性。金融交易:在處理金融交易等需要高一致性的應(yīng)用中,確保所有相關(guān)消息的原子性。
小結(jié)
Kafka 的事務(wù)性生產(chǎn)機(jī)制允許生產(chǎn)者以原子方式發(fā)送多條消息,確保消息的一致性和可靠性。通過正確配置和使用事務(wù),Kafka 能夠處理復(fù)雜的消息傳遞場(chǎng)景,確保系統(tǒng)的穩(wěn)定性和數(shù)據(jù)的一致性。事務(wù)性生產(chǎn)通過初始化、開始、發(fā)送、提交和中止等步驟,提供了一種安全的消息處理方式,使得生產(chǎn)者能夠在高負(fù)載和故障情況下依然保持?jǐn)?shù)據(jù)的完整性。
2. 事務(wù)性消費(fèi)
在 Kafka 中,沒有專門的“事務(wù)性消費(fèi)”這一術(shù)語,但消費(fèi)者可以通過配置和管理消息處理邏輯來實(shí)現(xiàn)類似于事務(wù)的效果,以確保消息消費(fèi)過程的一致性和可靠性。
1. 事務(wù)性消費(fèi)的基本概念
事務(wù)性消費(fèi)指的是消費(fèi)者在處理消息時(shí)能夠確保處理的原子性和一致性,尤其是在處理過程中可能涉及多個(gè)操作或多個(gè)消息的情況下。它通常涉及以下方面:
確保只有在成功處理消息后才提交 offset:這樣可以避免消息丟失或重復(fù)處理。在處理多個(gè)消息時(shí),確保所有相關(guān)消息的狀態(tài)一致。
2. 實(shí)現(xiàn)事務(wù)性消費(fèi)的關(guān)鍵步驟
消費(fèi)者可以通過以下步驟實(shí)現(xiàn)類似于事務(wù)的效果:
手動(dòng)提交 offset
消費(fèi)者可以選擇手動(dòng)提交 offset,而不是自動(dòng)提交,以控制何時(shí)將消息標(biāo)記為已消費(fèi)。這允許消費(fèi)者在處理消息時(shí)確保只有在成功處理后才提交 offset。例如,使用 commitSync() 或 commitAsync() 方法來手動(dòng)提交 offset。 處理消息的冪等性
在處理過程中,消費(fèi)者可以實(shí)現(xiàn)冪等性邏輯,確保即使同一條消息被多次處理,也不會(huì)導(dǎo)致數(shù)據(jù)不一致。這通常涉及在應(yīng)用程序中維護(hù)狀態(tài),例如通過唯一標(biāo)識(shí)符來跟蹤已處理的消息。 設(shè)置隔離級(jí)別
消費(fèi)者可以通過設(shè)置 isolation.level 參數(shù)來控制讀取消息的行為:
read_committed:消費(fèi)者只能讀取已提交的消息,確保數(shù)據(jù)的一致性。read_uncommitted:消費(fèi)者可以讀取所有消息,包括未提交的消息,這可能導(dǎo)致不一致性。
3. 故障處理與恢復(fù)
當(dāng)消費(fèi)者處理消息時(shí),可能會(huì)發(fā)生錯(cuò)誤或故障。通過手動(dòng)提交 offset,消費(fèi)者可以確保在恢復(fù)后從未提交的消息位置繼續(xù)處理,避免消息丟失。例如,如果消費(fèi)者在處理某條消息時(shí)崩潰,下一次消費(fèi)時(shí)可以從上次成功提交的 offset 開始。
4. 事務(wù)的使用場(chǎng)景
事務(wù)性消費(fèi)適用于以下場(chǎng)景:
數(shù)據(jù)一致性要求高的應(yīng)用:如金融交易、訂單處理等場(chǎng)景,確保每個(gè)消息的處理結(jié)果都是一致的??缍鄠€(gè)消息的處理:當(dāng)處理一組相關(guān)消息時(shí),確保它們的狀態(tài)一致,避免部分處理導(dǎo)致的不一致。
小結(jié) 盡管 Kafka 中沒有專門的“事務(wù)性消費(fèi)”概念,但消費(fèi)者可以通過手動(dòng)提交 offset、實(shí)現(xiàn)冪等性處理和設(shè)置隔離級(jí)別來實(shí)現(xiàn)類似于事務(wù)的效果。這種方式確保了消息消費(fèi)過程的一致性和可靠性,尤其在需要處理多個(gè)相關(guān)消息的場(chǎng)景下,能夠有效避免消息丟失或重復(fù)處理的問題。通過合理配置和管理,消費(fèi)者能夠在復(fù)雜的應(yīng)用場(chǎng)景中實(shí)現(xiàn)高一致性的消息處理。
17. 精確一次
Kafka 中的“精確一次”(Exactly Once)語義主要是通過以下幾個(gè)關(guān)鍵機(jī)制來實(shí)現(xiàn)的:
1. 冪等性(Idempotence)
冪等性生產(chǎn)者:通過啟用生產(chǎn)者的冪等性(設(shè)置 enable.idempotence=true),可以確保即使在網(wǎng)絡(luò)故障或重試的情況下,消息不會(huì)被重復(fù)寫入。這意味著無論生產(chǎn)者嘗試發(fā)送多少次相同的消息,最終只會(huì)寫入一條消息,避免重復(fù)數(shù)據(jù)。
2. 至少一次交付(At Least Once Delivery)
消息確認(rèn)(Acks):通過將生產(chǎn)者的確認(rèn)級(jí)別設(shè)置為 acks=all(即 acks=-1),可以確保所有副本都確認(rèn)收到消息,增強(qiáng)了消息的可靠性。副本數(shù)量:設(shè)置分區(qū)的副本數(shù)(Replication Factor)大于或等于2,以確保在主副本失敗時(shí),仍然有備份副本可供使用。
3. ISR(In-Sync Replicas)
最小 ISR 數(shù)量:通過設(shè)置最小 ISR 副本數(shù)量(min.insync.replicas)為 2,確保只有在至少有兩個(gè)副本(包括主副本)處于同步狀態(tài)時(shí),生產(chǎn)者才能成功提交消息。這進(jìn)一步增強(qiáng)了消息的一致性和可用性。
綜合解釋
在 Kafka 中,實(shí)現(xiàn)“精確一次”語義的具體方式如下:
啟用冪等性,以避免重復(fù)消息。配置生產(chǎn)者以確保 acks=all,確保所有副本都收到消息。確保分區(qū)具有多個(gè)副本,并設(shè)置合理的最小 ISR 數(shù)量,以確保即使在故障情況下也能保證數(shù)據(jù)的一致性。
18. 高低水位
在 Kafka 中,高水位(High Watermark,HWM)和低水位(Low Watermark,LWM)是用來管理消息在分區(qū)中的可見性和消費(fèi)者消費(fèi)進(jìn)度的重要概念。它們有助于確保數(shù)據(jù)的一致性和可靠性。
1. 高水位(High Watermark,HWM)
定義:高水位是指一個(gè)分區(qū)中所有副本(包括主副本和備份副本)已經(jīng)成功寫入的最新消息的位移(offset)。換句話說,高水位表示所有 ISR(In-Sync Replicas,處于同步狀態(tài)的副本)都確認(rèn)收到的最新消息的位移。作用:
消息可見性:消費(fèi)者只能消費(fèi)到高水位之前的消息。只有當(dāng)消息達(dá)到高水位時(shí),消費(fèi)者才會(huì)看到這些消息,確保消費(fèi)者讀取的是已確認(rèn)的數(shù)據(jù)。故障恢復(fù):在主副本發(fā)生故障時(shí),Kafka 會(huì)選舉新的主副本,而新的主副本的位移不會(huì)超過高水位,從而確保數(shù)據(jù)的一致性。
2. 低水位(Low Watermark,LWM)
定義:低水位是指在一個(gè)分區(qū)中,消費(fèi)者可以安全地提交的最小位移。即所有 ISR 中的副本都已經(jīng)成功接收到的消息的位移。作用:
影響消費(fèi)者提交:如果消費(fèi)者提交的位移低于低水位,可能會(huì)導(dǎo)致重復(fù)消費(fèi)。因此,Kafka 會(huì)阻止消費(fèi)者提交低于低水位的位移。維護(hù)數(shù)據(jù)完整性:低水位的概念有助于確保在所有副本都同步的情況下,消費(fèi)者的消費(fèi)進(jìn)度重復(fù)消費(fèi)。
3. 高水位與低水位的關(guān)系
高水位與低水位的區(qū)別:高水位表示當(dāng)前可用的最新消息的位移,而低水位表示消費(fèi)者可以安全提交的最低位移。高水位通常高于或等于低水位。影響消費(fèi)者行為:消費(fèi)者在消費(fèi)消息時(shí),必須注意低水位,以確保在處理消息時(shí)不會(huì)導(dǎo)致數(shù)據(jù)不一致。
4. 總結(jié) 高水位和低水位是 Kafka 中用于管理消息一致性和消費(fèi)者消費(fèi)進(jìn)度的重要機(jī)制。高水位確保消費(fèi)者只能消費(fèi)到已確認(rèn)的消息,而低水位確保消費(fèi)者在提交位移時(shí)不會(huì)導(dǎo)致數(shù)據(jù)丟失。通過這兩個(gè)概念,Kafka 能夠?qū)崿F(xiàn)高可靠性和一致性的消息傳遞。
19. 示例代碼
在 Spring Boot 項(xiàng)目中,實(shí)現(xiàn) Kafka 集群中的生產(chǎn)者和消費(fèi)者組(有 3 個(gè)消費(fèi)者并行消費(fèi) 3 個(gè)分區(qū)的消息)。
1. 配置 Kafka 集群
spring:
kafka:
bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
consumer:
group-id: consumer-group-1 # 消費(fèi)者組 ID
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Kafka 配置文件中的 auto-offset-reset: earliest 是用于控制消費(fèi)者在找不到先前提交的位移(offset)時(shí),該如何處理的配置選項(xiàng)。這個(gè)設(shè)置決定了消費(fèi)者在首次訂閱一個(gè)主題,或者在之前的位移記錄不可用時(shí),應(yīng)該從哪里開始消費(fèi)消息。earliest 表示從最早的可用消息開始消費(fèi)。如果找不到之前提交的位移(比如新消費(fèi)者組,或者之前的偏移量數(shù)據(jù)已經(jīng)過期),消費(fèi)者將從該主題中最早的消息開始消費(fèi)(即從偏移量 0 開始)。這個(gè)設(shè)置可以確保消費(fèi)者不會(huì)漏掉任何消息,尤其是在消費(fèi)者組第一次消費(fèi)主題或偏移量丟失的情況下。
常見配置選項(xiàng)
earliest:從最早的消息開始消費(fèi)(偏移量為 0)。latest:從最新的消息開始消費(fèi),即只消費(fèi)之后發(fā)布的消息。忽略之前發(fā)布的消息。
場(chǎng)景應(yīng)用
earliest:適用于需要處理主題中的所有消息的場(chǎng)景。比如系統(tǒng)首次上線時(shí),或消費(fèi)者希望從頭處理所有數(shù)據(jù)。latest:適用于只關(guān)心未來消息的場(chǎng)景,不需要消費(fèi)已經(jīng)發(fā)布的歷史消息。
2. 創(chuàng)建生產(chǎn)者
實(shí)現(xiàn)一個(gè) Kafka 生產(chǎn)者類 KafkaProducerService,用于發(fā)送消息到指定的 topic:
@Service
public class KafkaProducerService {
private final KafkaTemplate
public KafkaProducerService(KafkaTemplate
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Message sent: " + message);
}
}
3. 創(chuàng)建消費(fèi)者
創(chuàng)建一個(gè)消費(fèi)者類 KafkaConsumerService,用 3 個(gè)不同的消費(fèi)者并行處理 3 個(gè)分區(qū)的消息:
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "consumer-group-1", concurrency = "3")
public void listen(String message) {
System.out.println("Received message: " + message + " | Thread: " + Thread.currentThread().getName());
}
}
concurrency = “3”:配置 3 個(gè)并發(fā)線程,確保 3 個(gè)消費(fèi)者可以同時(shí)消費(fèi) 3 個(gè)分區(qū)的消息。
4. 控制器發(fā)送消息
創(chuàng)建一個(gè)控制器 KafkaController,通過 REST 接口觸發(fā)生產(chǎn)者發(fā)送消息到 Kafka:
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducerService kafkaProducerService;
@Autowired
public KafkaController(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
@PostMapping("/send")
public String sendMessage(@RequestBody String message) {
kafkaProducerService.sendMessage("my-topic", message);
return "Message sent: " + message;
}
}
柚子快報(bào)邀請(qǐng)碼778899分享:kafka
文章鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。