柚子快報(bào)激活碼778899分享:kafka(一):生產(chǎn)者
柚子快報(bào)激活碼778899分享:kafka(一):生產(chǎn)者
生產(chǎn)者在消息發(fā)送的過程中,涉及到了 兩個(gè)線程 ——main 線程和Sender 線程。 main線程創(chuàng)建 Producer 對(duì)象,調(diào)用 send 函數(shù)發(fā)送消息的經(jīng)過:
1.創(chuàng)建ProducerRecord:
應(yīng)用程序首先將待發(fā)送的數(shù)據(jù)封裝成一個(gè)ProducerRecord對(duì)象。這個(gè)對(duì)象包含了消息的Topic、Partition、可選的Key和Value,以及時(shí)間戳等信息。
2.攔截器:
生產(chǎn)者攔截器的使用主要是通過自定義實(shí)現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor接口來完成。這個(gè)接口定義了兩個(gè)主要的方法:onSend()和onCompletion()。onSend()方法在消息被序列化以及計(jì)算分區(qū)前調(diào)用,可以在這個(gè)方法中對(duì)消息進(jìn)行修改或過濾等操作。onCompletion()方法則在消息從RecordAccumulator成功發(fā)送到Kafka Broker,或者在發(fā)送失敗時(shí)調(diào)用,可以在這個(gè)方法中進(jìn)行一些發(fā)送后的處理,比如統(tǒng)計(jì)發(fā)送狀態(tài)等。
此外,Kafka生產(chǎn)者允許用戶指定多個(gè)攔截器,這些攔截器會(huì)按照指定的順序形成一個(gè)攔截鏈。在消息發(fā)送過程中,攔截鏈中的每個(gè)攔截器都會(huì)按照順序?qū)ο⑦M(jìn)行處理。
3.序列化:
Kafka是面向字節(jié)的系統(tǒng),因此ProducerRecord中的Key和Value需要序列化成字節(jié)序列。這通常通過序列化器(Serializer)來完成,例如StringSerializer或ByteArraySerializer。
4.分區(qū)器:
Kafka使用分區(qū)機(jī)制來保證消息的順序性和擴(kuò)展性。Producer根據(jù)Key(如果提供)和分區(qū)器(Partitioner)來決定將消息發(fā)送到哪個(gè)分區(qū)。
如果ProducerRecord對(duì)象中指定了Partition,則會(huì)優(yōu)先使用該P(yáng)artition如果ProducerRecord對(duì)象中沒有指定Partition,但是有Key,則會(huì)根據(jù)Key的hashcode與分區(qū)數(shù)取余得到具體分區(qū)號(hào)如果沒有指定Partition,也沒有Key,則會(huì)才用黏性分區(qū)的方法,會(huì)隨機(jī)選擇一個(gè)分區(qū),并盡可能一直使用這個(gè)分區(qū),待該分區(qū)的batch,size滿了或者linger.ms時(shí)間到了,再隨機(jī)選擇一個(gè)分區(qū)(絕對(duì)不是上一個(gè))進(jìn)行使用
5.緩存到RecordAccumulator :
序列化后的消息被緩存到RecordAccumulator中。這是一個(gè)內(nèi)存緩沖區(qū),用于收集將要發(fā)送的消息批次,以Topic為分組。每一組內(nèi)有拆分成多個(gè)批次,每個(gè)批次的默認(rèn)大小是16K。每個(gè)Deque是一個(gè)雙端隊(duì)列,創(chuàng)建隊(duì)列都是在內(nèi)存里完成的,默認(rèn)是32M,主線程append時(shí),往隊(duì)尾插入,sender線程取出時(shí),從隊(duì)頭取出。這樣做的好處是減少網(wǎng)絡(luò)IO的請(qǐng)求,增加吞吐量。
由于RecordAccumulator的緩存空間有限,如果空間被占滿,那么當(dāng)我們?cè)俅握{(diào)用KafkaProducer的send(…)方法的時(shí)候,就會(huì)出現(xiàn)阻塞(默認(rèn)60秒,可以通過參數(shù)max.block.ms來配置),如果阻塞超時(shí),則會(huì)拋出異常。
6.Sender線程從隊(duì)列里拉取數(shù)據(jù):
sender線程從隊(duì)列中拉取在RecordAccumulator中等待的數(shù)據(jù)
每次批處理batch.size的大小默認(rèn)為 16k延遲時(shí)間 linger.ms 默認(rèn)為 0ms,沒有延遲
這個(gè)兩個(gè)條件達(dá)到任意一個(gè)就可以發(fā)送數(shù)據(jù)。值得注意的是,send線程再拉去數(shù)據(jù)后,會(huì)按照Broker為組,重新把ProderBatch進(jìn)行分組。因?yàn)椴煌腡opic也可能是在同一個(gè)Broker上的,所以拉取之后的數(shù)據(jù)結(jié)構(gòu)依然是Deque
注意:拉取RecordAccumulator中的數(shù)據(jù),但是并不會(huì)從RecordAccumulator中刪除這部分?jǐn)?shù)據(jù)。
7.封裝成ProducerRequest:
消息發(fā)送前,還會(huì)對(duì)Deque再度進(jìn)行封裝成ProducerRequest,然后以Broker為組發(fā)送到NewWorkClient。
8.緩存到InFightRequest:
InFightRequest(在途請(qǐng)求緩沖區(qū))內(nèi)維護(hù)了一個(gè)key是Broker,value是Deque
InFightRequest 類中有一個(gè)屬性 maxInFlightRequestsPerConnection, 標(biāo)識(shí)一個(gè)Broker節(jié)點(diǎn)最多可以緩存多少個(gè)請(qǐng)求。該默認(rèn)值為 5, 可通過 max.in.flight.requests.per.connection 進(jìn)行配置, 需要注意的是 InFlightRequests 對(duì)象是在創(chuàng)建 KafkaProducer 時(shí)就會(huì)被創(chuàng)建。
9.使用NIO發(fā)送:
Sender線程使用Java NIO(非阻塞I/O)機(jī)制,將消息批次異步發(fā)送到對(duì)應(yīng)的Broker。NIO允許在單個(gè)線程內(nèi)處理多個(gè)網(wǎng)絡(luò)連接,從而提高網(wǎng)絡(luò)操作的效率。
10.發(fā)送到Broker集群&回調(diào)處理:
Kafka集群在收到響應(yīng)后會(huì)返回Ack,有兩種情況
返回成功,則會(huì)執(zhí)行剩余流程返回失敗,則會(huì)進(jìn)行retry重試機(jī)制,默認(rèn)重試次數(shù)是int的最大值,
同時(shí)Kafka提供了三種Ack的應(yīng)答級(jí)別
0 Partition的leader接收到消息還沒有寫入磁盤就已經(jīng)返回ack,當(dāng)Leader故障時(shí)有可能丟失數(shù)據(jù)。1 Partition的Leader落盤成功后返回Ack。如果在ISR隊(duì)列里面的節(jié)點(diǎn)同步成功之前,Leader故障了,盡管 Leader 已經(jīng)落盤成功,將會(huì)丟失 ISR內(nèi)Follower 還未同步 Leader的 那部分?jǐn)?shù)據(jù)。-1 Partition的Leader和ISR隊(duì)列里面所有節(jié)點(diǎn)全部落盤成功后才返回Ack。
Q1:Leader收到數(shù)據(jù),但是有一個(gè)Follower因?yàn)楣收希?一直沒有同步消息,怎么解決?
A1:首先明確一下ISR的概念,Leader維護(hù)了一個(gè)動(dòng)態(tài)隊(duì)列,意為和Learder保持同步的Follower+Leader的集合(leader:0 Isr:0,1,2)。如果Follower長(zhǎng)時(shí)間未向Leader發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該Follower將被踢出ISR。該時(shí)間閾值由replica.ag.ime.max.ms參數(shù)設(shè)定,默認(rèn)30s。例如2超時(shí)(leader:0 isr:0,l)
如果分區(qū)副本設(shè)置為1個(gè),或者ISR里應(yīng)答的最小副本數(shù)量(min.insync.replicas默認(rèn)為1)設(shè)置為1,和ack=l的效果是一樣的,仍然有丟數(shù)的風(fēng)險(xiǎn)(leader:0,isr:0)。
數(shù)據(jù)完全可靠條件=ACK級(jí)別設(shè)置為-1+分區(qū)副本大于等于2+ISR里應(yīng)答的最小副本數(shù)量大于等于2
注意,這里的“副本”并不是指的 Follower;在 Kafka 中,副本分為 Leader 副本和 Follower 副本。Leader 副本負(fù)責(zé)處理消息,而 Follower 副本則簡(jiǎn)單地復(fù)制 Leader 副本的數(shù)據(jù)。也就是一個(gè)分區(qū)至少要有 1 個(gè) Leader 和 1 個(gè) Follower,ISR 隊(duì)列最少也要有 1 個(gè) Leader 和 1 個(gè) Follower。
Q2:如果在Follower同步完成后,Broker發(fā)送Ack之前,leader發(fā)生故障,即選舉出新的 Leader,新的 Leader 將再次落盤一次,那么會(huì)造成數(shù)據(jù)重復(fù),怎么解決?
A2:Kafka0.11版本以后,引入了一項(xiàng)重大特性:冪等性。
生產(chǎn)者在發(fā)送每條消息Batch消息的時(shí)候,會(huì)為消息分配一個(gè)唯一標(biāo)識(shí)
所以冪等性只能保證的是在單分區(qū)單會(huì)話內(nèi)不重復(fù)。而且因?yàn)镻roducerStateManager只會(huì)緩存最近5個(gè)的數(shù)據(jù),所以在途請(qǐng)求區(qū)的數(shù)量也要設(shè)置成不能大于5,否則依舊會(huì)出現(xiàn)重復(fù)的現(xiàn)象。
Q3:每條batch數(shù)據(jù)都是帶有Sequence Number的,比如有1~5條消息,其中序號(hào)是2的消息因?yàn)榫W(wǎng)絡(luò)原因,Broker沒有響應(yīng)Ack,因?yàn)橹卦嚈C(jī)制序號(hào)2就會(huì)重新發(fā)送。此時(shí)消息的順序就變成了2->5->4->3->1,產(chǎn)生了亂序,這種情況應(yīng)該如何解決?
A3:ProducerStateManager會(huì)根據(jù)Sequence Number的連續(xù)性來判斷,如果不連續(xù)或者新來數(shù)據(jù)的Sequence Number大于緩存的最大Sequence Number,那說明有數(shù)據(jù)漏掉了。則會(huì)報(bào)錯(cuò),讓Producer從緩沖區(qū)沖重新推送 完整的數(shù)據(jù)
配置上:1.Ack=-1 2.enable.idempotence 3.在途請(qǐng)求區(qū)大小設(shè)置成5
Q4:但是對(duì)于冪等性而言是有一個(gè)缺陷的,Producer一旦重啟,分配的PID就會(huì)變化,依然會(huì)導(dǎo)致冪等性的問題
A4:這就需要引入事務(wù),事務(wù)是解決跨會(huì)話的分區(qū)的問題,對(duì)于Producer,需要設(shè)置transactional.id屬性。(事務(wù)部分比較復(fù)雜,有機(jī)會(huì)單獨(dú)寫一期來講)
每條消息發(fā)送后,Broker回復(fù)響應(yīng),以便后續(xù)處理 org.apache.kafka.clients.producer.internals.Sender#handleProduceResponse
11.回調(diào):
Producer根據(jù)Acks配置等待Broker的確認(rèn)。
如果發(fā)送失敗,根據(jù)retries和retry.backoff.ms配置,Producer可能會(huì)重試發(fā)送消息如果發(fā)送成功,則移除在途緩沖區(qū)內(nèi)的Request
12.回調(diào)處理:
如果消息被成功Ack,則清理掉RecordAccumulator中的緩存數(shù)據(jù)
柚子快報(bào)激活碼778899分享:kafka(一):生產(chǎn)者
推薦閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。