柚子快報激活碼778899分享:分布式 Kafka的核心原理
柚子快報激活碼778899分享:分布式 Kafka的核心原理
目錄
Tpoic的分區(qū)和副本機(jī)制
分區(qū)
副本
?消息存儲機(jī)制和查詢機(jī)制
消息存儲機(jī)制
? log文件和index文件的解析
?index文件內(nèi)容基本結(jié)構(gòu)
查詢機(jī)制
Kafka中生產(chǎn)者數(shù)據(jù)分發(fā)策略
隨機(jī)分發(fā)策略
指定分區(qū)策略
Hash取模策略
自定義分區(qū)策略
輪詢分發(fā)策略 和 粘性分發(fā)策略
Kafka消費者的負(fù)載均衡機(jī)制
數(shù)據(jù)不丟失機(jī)制
生產(chǎn)端保證消息不丟失
Broker端保證數(shù)據(jù)不丟失
消費端如何保證數(shù)據(jù)不丟失
Kafka的數(shù)據(jù)積壓
通過命令的方式查看數(shù)據(jù)積壓
?數(shù)據(jù)積壓問題處理
Tpoic的分區(qū)和副本機(jī)制
分區(qū)
分區(qū)的作用:
1.避免單臺服務(wù)器容量的限制:每臺服務(wù)器的磁盤存儲空間是有限的,Topic分成多個partition分區(qū),可以避免單個partition數(shù)據(jù)大小多大,導(dǎo)致服務(wù)器無法存儲,利用多臺服務(wù)器的存儲能力,提升Topic的數(shù)據(jù)存儲條數(shù)
2.提升Topic的吞吐量(數(shù)據(jù)讀寫速度):利用多臺服務(wù)器的能力,網(wǎng)絡(luò)等資源
分區(qū)數(shù)量
分區(qū)的數(shù)量沒有限制,分區(qū)數(shù)量和Kafka集群中的broker節(jié)點個數(shù)沒有任何關(guān)系,在實際工作推薦Topic的分區(qū)數(shù)量不要超過kafka集群中的broker節(jié)點個數(shù)的3倍,這只是一個推薦/經(jīng)驗值
副本
副本作用:
通過多副本的機(jī)制,提升數(shù)據(jù)安全性,但是副本過多,會導(dǎo)致冗余(重復(fù))的數(shù)據(jù)過多
副本的數(shù)量限制
副本數(shù)量最大不能超過kafka集群中的broker節(jié)點個數(shù),在實際工作中,推薦的分區(qū)的副本數(shù)量是1-3個,具體設(shè)置多少個,根據(jù)企業(yè)的數(shù)據(jù)重要程度進(jìn)行選擇,如果數(shù)據(jù)重要,可以將副本數(shù)設(shè)置大一些,如果數(shù)據(jù)不太重要,可以將副本數(shù)設(shè)置小一些
?消息存儲機(jī)制和查詢機(jī)制
消息存儲機(jī)制
?1.Topic的數(shù)據(jù)存放路徑是:/export/server/kafka/data,在該目錄下,還有其他的目錄,而且以Topic進(jìn)行劃分,具體目錄的命名規(guī)則是:Topic名稱-分區(qū)編號
2.Topic目錄下,存放的是消息的數(shù)據(jù)文件,并且是成對出現(xiàn),也就是xx.log文件和xx.index文件
? log文件和index文件的解析
xx.log文件和xx.index文件的作用:
xx.log文件:稱之為segment片段文件,也就是一個partition分區(qū)的數(shù)據(jù),會被分成多個segment(log)片段文件進(jìn)行存儲
xx.index文件:稱之為索引文件,該文件的作用是用來加快對xx.log文件內(nèi)容檢索的速度
xx.log和xx.index文件名稱的意義:
這個數(shù)字是xx.log文件中第一條消息的offset(偏移量),offset(偏移量)從0開始編號
partition分區(qū)的數(shù)據(jù)分成多個xx.log(segment片段文件)文件進(jìn)行存儲的意義:
1.如果一個文件數(shù)據(jù)量過大,打開和關(guān)閉文件都非常消耗資源
2.在一個大文件中,檢索內(nèi)容也會非常消耗資源
3.kafka只是用來臨時存儲消息數(shù)據(jù),會定時將過期數(shù)據(jù)刪除,如果數(shù)據(jù)在一個文件中,刪除效率低,如果數(shù)據(jù)分成了多個segment片段文件進(jìn)行存儲,刪除的時候只需要判斷segment文件最后修改時間,如果超過了保留時間,就直接將整個segment文件刪除,該保留時間是通過server.properties文件中的log.retention.hours=168進(jìn)行設(shè)置的,默認(rèn)保留168小時(7天)
?index文件內(nèi)容基本結(jié)構(gòu)
查詢機(jī)制
查詢步驟:,
1.首先先確定要讀取哪個xx.log(segment片段)文件,368776該offset的消息在368769.log文件中
2.查詢xx.log對應(yīng)的xx.index,查詢該條消息的物理偏移量范圍
3.根據(jù)消息的物理偏移量范圍去讀取xx.log文件(底層是基于磁盤的順序讀取)
4.最終就獲取到了具體的消息內(nèi)容
Kafka中生產(chǎn)者數(shù)據(jù)分發(fā)策略
? 生產(chǎn)者數(shù)據(jù)分發(fā)策略指的集市生產(chǎn)者生產(chǎn)的消息是如何保存到具體分區(qū)上
分發(fā)策略如下分類:
1.隨機(jī)分發(fā)策略:將消息發(fā)到隨機(jī)某個分區(qū)上,還是發(fā)送到Leader主副本上,python支持,java不支持
2.指定分區(qū)策略:將消息發(fā)到指定分區(qū)上面python支持,java不支持
3.Hash取模策略:對消息的key先取Hash值,再和分區(qū)數(shù)取模,python支持,java不支持
4.輪詢策略:再kafka的2.4及以上版本,已經(jīng)更名成粘性分發(fā)策略,python不支持,java支持
5.自定義分發(fā)策略:,python支持,java支持
隨機(jī)分發(fā)策略
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
當(dāng)在發(fā)送數(shù)據(jù)的時候, 如果只傳遞了topic 和 value,沒有指定key的時候, 那么此時就采用隨機(jī)策略
在kafka中, 專門提供了一個默認(rèn)的分發(fā)數(shù)據(jù)的類: DefaultPartitioner ??? def __call__(cls, key, all_partitions, available): ??????? """ ?????? ??? ?如果 key為 null, 那么隨機(jī)返回一個分區(qū)的編號 ??????? """ ??????? if key is None: ??????????? if available: ??????????????? return random.choice(available) ??????????? return random.choice(all_partitions) ?? ??? ?# 后續(xù)的代碼 當(dāng)沒有key的時候,壓根就執(zhí)行不到 ??????? idx = murmur2(key) ??????? idx &= 0x7fffffff ??????? idx %= len(all_partitions) ??????? return all_partitions[idx]
指定分區(qū)策略
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
當(dāng)在發(fā)送數(shù)據(jù)的時候, 如果指定了partition參數(shù), 表示的采用指定分區(qū)的方案, 分區(qū)的編號從0開始
當(dāng)指定了partition的參數(shù)后, 與DefaultPartitioner沒有任何的關(guān)系
Hash取模策略
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
當(dāng)在發(fā)送數(shù)據(jù)的時候, 如果傳遞了topic 和 value 以及key的時候, 那么此時就是采用hash取模策略
注意: 相同key的返回的hash值是一致的, 同樣對應(yīng)分區(qū)也是同一個。也就是要注意數(shù)據(jù)傾斜的問題。
在kafka中, 專門提供了一個默認(rèn)的分發(fā)數(shù)據(jù)的類: DefaultPartitioner ??? def __call__(cls, key, all_partitions, available): ??????? """ ?????? ??? ?如果 key為 null, 那么隨機(jī)返回一個分片的編號 ??????? """ ??????? if key is None: ??????????? if available: ??????????????? return random.choice(available) ??????????? return random.choice(all_partitions) ?? ??? ?# 當(dāng)有key的時候 , 執(zhí)行下列代碼. 此處的代碼其實就是hash取模的方案 ??????? idx = murmur2(key) ??????? idx &= 0x7fffffff ??????? idx %= len(all_partitions) ??????? return all_partitions[idx]
自定義分區(qū)策略
參考DefaultPartitioner 它怎么寫你就怎么寫
# 第一步:創(chuàng)建自己的分區(qū)類 class MyPartitioner(object):
?? ?# 第二步:實現(xiàn)__call__。key:消息中的key;all_partitions:所有的分區(qū)列表;available:所有可用分區(qū)的列表 ??? @classmethod ??? def __call__(cls, key, all_partitions, available): ??????? # 第三步:分發(fā)邏輯根據(jù)自己要求進(jìn)行實現(xiàn) ?????? return 0 ????? ? # 第四步:導(dǎo)入自己的分區(qū)類 import MyPartitioner
# 第五步:調(diào)用 producer = KafkaProducer( ?? ?bootstrap_servers=['node1.itcast.cn:9092','node2.itcast.cn:9092'], ?? ?partitioner=MyPartitioner() )
輪詢分發(fā)策略 和 粘性分發(fā)策略
輪詢分發(fā)策略: 在Kafka的老版本中存在的一種分發(fā)策略,當(dāng)生產(chǎn)數(shù)據(jù)的時候,只有value但是沒有key的時候,采用輪詢。 ?? ?優(yōu)點: 可以保證每個分區(qū)拿到的數(shù)據(jù)基本是一樣,因為是一個一個的輪詢的分發(fā) ?? ?缺點: 如果采用異步發(fā)送方式,意味著一批數(shù)據(jù)發(fā)送到broker端,由于是輪詢策略,會將這一批數(shù)據(jù)拆分為多個小的批次,分別再寫入到不同的分區(qū)里面去,寫入進(jìn)去以后,每個分區(qū)都會給予響應(yīng),會影響寫入效率。 ?? ? 粘性分發(fā)策略: 在Kafka新版本中存在的一種分發(fā)策略。當(dāng)生產(chǎn)數(shù)據(jù)的時候,只有value但是沒有key的時候,采用粘性分發(fā)策略 ?? ?優(yōu)點: 在發(fā)送數(shù)據(jù)的時候,首先會隨機(jī)的選取一個分區(qū),然后盡可能將數(shù)據(jù)分發(fā)到這個分區(qū)上面去,也就是盡可能粘著這個分區(qū)。該分發(fā)方式,在異步發(fā)送的操作中,效率比較高。 ?? ?缺點: 在數(shù)據(jù)發(fā)送特別快的時候,可能會導(dǎo)致某個分區(qū)的數(shù)據(jù)比其他分區(qū)數(shù)據(jù)多很多,造成大量的數(shù)據(jù)集中在一個分區(qū)上面
Kafka消費者的負(fù)載均衡機(jī)制
1.在同一個消費組中,消費者的個數(shù)最多不能超過Topic的分區(qū)數(shù),如果超過了,就會有一些消費者處于閑置狀態(tài),消費不到任何數(shù)據(jù)
2.在同一個消費組中,一個Topic中一個分區(qū)的數(shù)據(jù),只能被同個消費組中的一個消費者所消費,不能被同個消費者組中多個消費者所消費,但是一個消費組可以消費多個分區(qū)的數(shù)據(jù),也就是分區(qū)和消費的對應(yīng)關(guān)系,多對一
3.不同的消費組中的消費者,可以對一個Topic的數(shù)據(jù)同時消費,也就是不同消費組間沒有任何關(guān)系,也就是Topic的數(shù)據(jù)能夠被對個消費組中的消費者重復(fù)消費
查看消費組中有多少個消費者,用來避免消費者個數(shù)超過分區(qū)個數(shù)。
./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_1 --members --describe
數(shù)據(jù)不丟失機(jī)制
生產(chǎn)端保證消息不丟失
生產(chǎn)者端保證數(shù)據(jù)不丟失:
? 生產(chǎn)者端將消失發(fā)送給到kafka集群以后,broker要給生產(chǎn)者響應(yīng)信息,響應(yīng)原理就是ACK機(jī)制
ACK機(jī)制當(dāng)中有3個參數(shù)配置值,分別是: 0? 1? -1 (all)
0 :生產(chǎn)者生產(chǎn)消息給到kafka集群,生產(chǎn)者不等待(不接收)broker返回的響應(yīng)信息
1 :生產(chǎn)者生產(chǎn)消息給到kafka集群,kafka集群中的分區(qū)對應(yīng)的Leader主副本所在的broker給生產(chǎn)者返回響應(yīng)信息
-1(all) :生產(chǎn)者生產(chǎn)消息給到kafka集群,kafka集群中的分區(qū)對應(yīng)的所有副本給生產(chǎn)者返回響應(yīng)信息
消息的生產(chǎn)效率排序(由高到低):0>1>-1
消息的安全級別排序(由高到低):-1>1>0
在實際工作中如何選擇ACK參數(shù)配置? 答:根據(jù)數(shù)據(jù)的重要程度進(jìn)行選擇。如果數(shù)據(jù)重要,優(yōu)先保證數(shù)據(jù)的安全性,再考慮生產(chǎn)效率;如果數(shù)據(jù)不重要,優(yōu)先考慮生產(chǎn)效率,再盡可能提升安全級別。
相關(guān)的參數(shù)
1- acks? broker節(jié)點確認(rèn)機(jī)制 ??? 默認(rèn)值:1;數(shù)據(jù)類型:string
2- buffer.memory 緩存大小 ??? 默認(rèn)值:33554432(32MB)
3- retries 失敗后重試次數(shù) ??? 默認(rèn)值:2147483647,該值沒有意義,一般是使用delivery.timeout.ms參數(shù)進(jìn)行控制
4- delivery.timeout.ms 消息傳輸超時時間 ??? 默認(rèn)值:120000(120秒)
5- batch.size 每一批次的消息數(shù)據(jù)的大小 ??? 默認(rèn)值:16384(16KB)
6- linger.ms 每一批次的間隔時間 ??? 默認(rèn)值:0
Broker端保證數(shù)據(jù)不丟失
? Broker端通過多副本機(jī)制確保數(shù)據(jù)不丟失。同時需要生產(chǎn)者端將acks設(shè)置為-1
消費端如何保證數(shù)據(jù)不丟失
消費者消費消息的步驟:
1.消費者首先連接到kafka集群中,進(jìn)行消息的消費
2.kafka集群接收到Consumer消費者的消費請求后,首先會根據(jù)group id(消費組名稱),查找上次消費消息對應(yīng)的offset(偏移量)
3.如果沒有查到offset,消費者默認(rèn)從Topic最新的地方開始消費
4.如果有查到offset,會從上次消費到的offset地方進(jìn)行繼續(xù)消費
??????? 4-1.首先先確定要讀取的這個offset偏移量在哪個segment文件中
??????? 4-2.查詢這個segment文件對應(yīng)的index文件,根據(jù)offset確定這個消息在log文件的什么位置,也就是確定消息的物理偏移量
??????? 4-3:讀取log文件,查詢對應(yīng)范圍內(nèi)的數(shù)據(jù)即可
??????? 4-4:獲取最終的消息數(shù)據(jù)
5.消費者在消費的過程中,底層有個線程會定時的將消費的offset提交給到kafka集群,kafka集群會更新對應(yīng)的offset的值
該流程能夠保證消費端不丟失數(shù)據(jù)嗎? ?????????可以保證消費端數(shù)據(jù)不丟失。但是會出現(xiàn)重復(fù)消費的情況。
消費組的offset信息保存在什么地方? ????????Kafka集群內(nèi)部會創(chuàng)建一個叫做__consumer_offsets的Topic來保存offset信息。該Topic有50個分區(qū),1個副本
Kafka中消費者如何對數(shù)據(jù)僅且只消費一次?
1- 將消費者的 enable.auto.commit 屬性設(shè)置為 false,并手動管理消費者的偏移量。這樣可以確保消費者在處理完所有消息后才更新偏移量,避免重復(fù)消費數(shù)據(jù)。也就是將消息的消費、消息業(yè)務(wù)處理代碼、offset提交代碼放在同一個事務(wù)當(dāng)中。
2- 使用冪等生產(chǎn)者或事務(wù)性生產(chǎn)者來確保消息只被發(fā)送一次。這樣可以避免重復(fù)發(fā)送消息,從而避免消費者重復(fù)消費數(shù)據(jù)。
3- 在消息中加入唯一的ID
在提交偏移量的時候,有二種提交方式: 自動提交偏移量 和 手動提交偏移量,手動提交又分了同步和異步
Kafka的數(shù)據(jù)積壓
? 數(shù)據(jù)持續(xù)在kafka集群中積壓,也就是lag的值,一直在增大沒有在減小,正常情況下,lag的值是來回波動的
通過命令的方式查看數(shù)據(jù)積壓
kafka集群有哪些消費組
./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --list
查看指定消費組數(shù)據(jù)積壓情況
./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092? --describe --group g_2
?數(shù)據(jù)積壓問題處理
出現(xiàn)積壓的原因:
1.因為寫入目的容器失敗,從而導(dǎo)致消費失敗
2.因為網(wǎng)絡(luò)延遲消費失敗
3.消費邏輯過于復(fù)雜,導(dǎo)致消費過慢,出現(xiàn)積壓問題
解決方案:
1.處理異常容器,保證一直可用狀態(tài)
2.對于第二種, 如果之前一直沒問題, 只是某一天出現(xiàn), 可以調(diào)整消費的超時時間。并且同時解決網(wǎng)絡(luò)延遲問題
3.對于第三種,調(diào)整消費代碼,消費更快,利用消費者的負(fù)載均衡策略,提升消費者數(shù)量
柚子快報激活碼778899分享:分布式 Kafka的核心原理
精彩文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。