柚子快報(bào)邀請(qǐng)碼778899分享:分布式 【中間件】kafka
柚子快報(bào)邀請(qǐng)碼778899分享:分布式 【中間件】kafka
目錄
一、概述二、生產(chǎn)者1. 發(fā)送原理2. 生產(chǎn)者分區(qū) Partition分區(qū)好處分區(qū)策略
3. 生產(chǎn)者如何提高吞吐量4. 數(shù)據(jù)可靠性ACK應(yīng)答級(jí)別數(shù)據(jù)不丟失:ACK + ISR數(shù)據(jù)不重復(fù):冪等性數(shù)據(jù)有序
三、broker1. 工作流程2. 副本相關(guān)3. 底層存儲(chǔ)4. 高效讀寫數(shù)據(jù)
四、消費(fèi)者1. 工作流程2. 分區(qū)分配和重平衡3. offset 位移
一、概述
定義:是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(MessageQueue),主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域 三大功能
削峰: 高峰期的消息可以積壓到消息隊(duì)列中,隨后平滑地處理完成,避免突發(fā)訪問壓力壓垮系統(tǒng)解耦: 消息隊(duì)列避免模塊之間的相互調(diào)用,降低各個(gè)模塊的耦合性,提高系統(tǒng)的可擴(kuò)展性異步: 發(fā)送方把消息放在消息隊(duì)列中,接收方無需立即處理,可以等待合適的時(shí)間處理 基礎(chǔ)架構(gòu):
組件作用Producer消息生產(chǎn)者,就是向 Kafka broker 發(fā)消息的客戶端Consumer消息消費(fèi)者,向 Kafka broker 取消息的客戶端Consumer Group(CG)消費(fèi)者組,由多個(gè) consumer 組成。組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響。消費(fèi)者組是邏輯上的一個(gè)訂閱者Broker一臺(tái) Kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè)broker 可以容納多個(gè) topicTopic消息主題(邏輯概念) ,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topicPartition一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列Replica副本。每個(gè)分區(qū)都有若干個(gè)副本,一個(gè) Leader 和若干個(gè)FollowerLeader一組副本中的“主”,只有主和生產(chǎn)者消費(fèi)者交互Follower一組副本中的“從”,實(shí)時(shí)從 Leader 中同步數(shù)據(jù),保持和Leader 數(shù)據(jù)的同步SegmentPartition 物理上被分成多個(gè) Segment,每個(gè) Segment 1個(gè)GZookeeper保存元信息,現(xiàn)已廢除
二、生產(chǎn)者
1. 發(fā)送原理
涉及到了兩個(gè)線程——main 線程和 Sender 線程
在 main 線程中創(chuàng)建了一個(gè)雙端隊(duì)列 RecordAccumulator。main 線程將消息發(fā)送給消息隊(duì)列當(dāng)消息隊(duì)列內(nèi)的消息達(dá)到一定大小,或者達(dá)到時(shí)間限制,會(huì)通知sender線程Sender 線程不斷從消息隊(duì)列中拉取消息發(fā)送到 Kafka Broker
可以選擇是異步還是同步(同步就是sender等待收到broker的ack后,再去發(fā)送新消息)
2. 生產(chǎn)者分區(qū) Partition
分區(qū)好處
便于合理使用存儲(chǔ)資源,可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)存儲(chǔ)在多臺(tái)Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù)
分區(qū)策略
生產(chǎn)者生產(chǎn)消息的時(shí)候:
指明partition的情況下,直接將指明的值作為partition值;例如partition=0,所有數(shù)據(jù)寫入分區(qū)0 沒有指明partition值但有key的情況下,將key的hash值與topic的partition數(shù)進(jìn)行取余得到partition值 例如:key1的hash值=5, key2的hash值=6 ,topic的partition數(shù)=2,那么key1 對(duì)應(yīng)的value1寫入1號(hào)分區(qū),key2對(duì)應(yīng)的value2寫入0號(hào)分區(qū) 既沒有partition值又沒有key值的情況下,Kafka采用Sticky Partition(黏性分區(qū)器),會(huì)隨機(jī)選擇一個(gè)分區(qū),并盡可能一直使用該分區(qū),待該分區(qū)的batch已滿或者已完成,Kafka再隨機(jī)一個(gè)分區(qū)進(jìn)行使用(和上一次的分區(qū)不同)。 例如:第一次隨機(jī)選擇0號(hào)分區(qū),等0號(hào)分區(qū)當(dāng)前批次滿了(默認(rèn)16k)或者linger.ms設(shè)置的時(shí)間到, Kafka再隨機(jī)一個(gè)分區(qū)進(jìn)行使用(如果還是0會(huì)繼續(xù)隨機(jī)) 自定義分區(qū):定義類實(shí)現(xiàn) Partitioner 接口,重寫 partition()方法,方法返回分區(qū)號(hào)
3. 生產(chǎn)者如何提高吞吐量
提高main線程創(chuàng)建的消息隊(duì)列大小:緩存大一點(diǎn)提高batchsize大?。憾嗟纫恍?shù)據(jù)再傳調(diào)整等待時(shí)間:雙刃劍,太短一次傳的消息太少,太長有延遲對(duì)傳輸數(shù)據(jù)做壓縮:能傳更多的消息
4. 數(shù)據(jù)可靠性
ACK應(yīng)答級(jí)別
0:生產(chǎn)者發(fā)送過來的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答 1:生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答 -1:生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader和ISR隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答
單純用0或1都會(huì)導(dǎo)致丟數(shù),而單純用-1會(huì)導(dǎo)致多數(shù)重復(fù)
數(shù)據(jù)不丟失:ACK + ISR
ACK = -1 + 副本 >= 2 + ISR最小副本數(shù)量 >= 2
數(shù)據(jù)不重復(fù):冪等性
數(shù)據(jù)語義
最多一次:ACK = 0至少一次:ACK = -1 + 副本 >= 2 + ISR最小副本數(shù)量 >= 2精確一次:冪等性 + 至少一次 重復(fù)數(shù)據(jù)的判斷標(biāo)準(zhǔn):具有
PID是Kafka每次重啟都會(huì)分配一個(gè)新的Producer IDPartition 表示分區(qū)號(hào)Sequence Number是單調(diào)自增的 所以冪等性只能保證的是在單分區(qū)單會(huì)話內(nèi)不重復(fù) 全局不重復(fù)需要開啟事務(wù)
數(shù)據(jù)有序
生產(chǎn)者有序發(fā)送消息
一個(gè)一個(gè)消息的發(fā):一個(gè) Topic 下的同一個(gè) Partition 一定是有序的不是一個(gè)一個(gè)發(fā):需要開啟冪等性且一次發(fā)不能超過5個(gè),這樣如果亂序到達(dá)的話,broker會(huì)自己排序 消費(fèi)者有序消費(fèi)
一個(gè)分區(qū)只讓一個(gè)消費(fèi)者來消費(fèi),即能保證
三、broker
1. 工作流程
生產(chǎn)者將消息發(fā)送給分區(qū) LeaderLeader 將消息寫入本地文件對(duì)應(yīng)的 Follower 從 Leader 拉取消息并寫入本地文件Follower 向 Leader 發(fā)送 ACKLeader向生產(chǎn)者回復(fù)
leader的維護(hù)由保存在paitition內(nèi)的Controller來做,Controller也是分布式的,他會(huì)監(jiān)聽brokers節(jié)點(diǎn)的變化,在節(jié)點(diǎn)掛掉的時(shí)候輔助選舉新leader,選舉規(guī)則:在ids列表內(nèi)按順序選擇
2. 副本相關(guān)
定義:每個(gè)partition都有多份,叫副本,來提高可靠性
副本分為Leader和Follower,只有Leader和生產(chǎn)者和消費(fèi)者交互副本AR = ISR + OSR Leader 和 Follower 故障處理
Follower故障:被踢出ISR,恢復(fù)后再加入ISRLeader故障:從ISR中選出一個(gè)新的Leader,恢復(fù)后去除舊數(shù)據(jù),和新Leader進(jìn)行同步(只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)) 副本分區(qū)分配 盡可能的把Leader散開,否則會(huì)對(duì)某一個(gè)broker產(chǎn)生很大的壓力
3. 底層存儲(chǔ)
partition下進(jìn)一步將數(shù)據(jù)分為Segment,每個(gè)1G
Segment分為
log:存具體數(shù)據(jù),以追加的方式index:索引,稀疏索引,4KB記一條索引時(shí)間戳:過期刪除用的 刪除方法
刪除:直接刪除壓縮:相同key只保留最新的
4. 高效讀寫數(shù)據(jù)
Kafka 本身是分布式集群,可以采用分區(qū)技術(shù),并行度高讀數(shù)據(jù)采用稀疏索引,可以快速定位要消費(fèi)的數(shù)據(jù)順序?qū)懘疟P頁緩存 + 零拷貝技術(shù)
頁緩存PageCache:重度依賴底層操作系統(tǒng)提供的PageCache功能,寫的時(shí)候直接交給頁緩存,讀的時(shí)候先讀頁緩存,沒有再讀磁盤零拷貝:消息從磁盤里讀出來之后不走應(yīng)用層代碼,直接走網(wǎng)卡,不占用CPU
四、消費(fèi)者
1. 工作流程
消費(fèi)者可以分組,一個(gè)分區(qū)只能由組內(nèi)的一個(gè)消費(fèi)者消費(fèi),消費(fèi)者組是邏輯上的一個(gè)訂閱者用offset標(biāo)識(shí)消費(fèi)的位置,由消費(fèi)者提交,保存在主題內(nèi),由coordinator管理,這也是個(gè)分布式
主要就是從broker里拉取數(shù)據(jù)
2. 分區(qū)分配和重平衡
分區(qū)分配問題:一個(gè)consumer group中有多個(gè)consumer組成,一個(gè) topic有多個(gè)partition組成,問題是,到底由哪個(gè)consumer來消費(fèi)哪個(gè)partition的數(shù)據(jù)
分區(qū)分配策略
Range:對(duì)每個(gè) topic 而言, partitions數(shù)/consumer數(shù)來決定,會(huì)產(chǎn)生數(shù)據(jù)傾斜RoundRobin:針對(duì)集群中所有Topic而言,所有的 partition輪詢分配Sticky:盡量均勻地分配分區(qū),根據(jù)上次的分配結(jié)果盡量減少變動(dòng)
3. offset 位移
位移保存方式:存在__consumer_offsets里,采用 key 和 value 的方式存儲(chǔ)數(shù)據(jù)。key 是 group.id+topic+分區(qū)號(hào),value 就是當(dāng)前 offset 的值 位移的提交方式
自動(dòng)提交(可能造成重復(fù)消費(fèi)) 重復(fù)消費(fèi):已經(jīng)消費(fèi)了數(shù)據(jù),但是 offset 沒提交 比如每隔5s,下一輪過了2s掛了,會(huì)重復(fù)消費(fèi)這2s的內(nèi)容 手動(dòng)提交(可能造成漏消費(fèi)) 漏消費(fèi):先提交 offset 后消費(fèi),有可能會(huì)造成數(shù)據(jù)的漏消費(fèi) 比如消費(fèi)者取了,還在內(nèi)存里,剛提交還沒來得及落盤就掛了,沒落盤的就漏消費(fèi)了 不管是重復(fù)消費(fèi)還是漏消費(fèi),都是提交和落盤的間隙出現(xiàn)宕機(jī)的情況,可以開啟事務(wù),把這兩個(gè)動(dòng)作原子綁定
柚子快報(bào)邀請(qǐng)碼778899分享:分布式 【中間件】kafka
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。