柚子快報邀請碼778899分享:分布式 kafka事務的詳解
柚子快報邀請碼778899分享:分布式 kafka事務的詳解
一 kafka事務的機制
1.1 冪等性
Producer 的冪等性指的是當發(fā)送同一條消息時,數據在 Server 端只會被持久化一次,數據不丟不重,Kafka為了實現冪等性,底層設計架構中引入了ProducerID和SequenceNumbe。
當Producer發(fā)送消息(x2,y2)給Broker時,Broker接收到消息并將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發(fā)生異常導致Producer接收Ack信號失敗。對于Producer來說,會觸發(fā)重試機制,將消息(x2,y2)再次發(fā)送,但是,由于引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發(fā)送給Broker,而之前Broker緩存過之前發(fā)送的相同的消息,那么在消息流中的消息就只有一條(x2,y2),不會出現重復發(fā)送的情況。
缺點:Kafka 的 Exactly Once 冪等性只能保證單次會話內的精準一次性,不能解決跨會話和跨分區(qū)的問題;
1.2?kafka的事務機制
1.2.1 事務的作用
kafka的事務機制,是kafka實現端到端有且僅有一次語義的基礎。Kafka 的 Exactly Once 冪等性只能保證單次會話內的精準一次性,不能解決跨會話和跨分區(qū)的問題;
Kafka的事務特性本質上是支持了Kafka跨分區(qū)和Topic的原子寫操作。通過事務機制,KAFKA 可以實現對多個 topic 的多個 partition 的原子性的寫入,即處于同一個事務內的所有消息,不管最終需要落地到哪個 topic 的哪個 partition, 最終結果都是要么全部寫成功,要么全部寫失?。ˋtomic multi-partition writes);開啟事務,必須開啟冪等性,KAFKA的事務機制,在底層依賴于冪等生產者。
Kafka的事務特性就是要確保跨分區(qū)的多個寫操作的原子性。
具體的場景包括:Producer多次發(fā)送消息可以封裝成一個原子性操作,即同時成功,或者同時失敗;(可以是跨多分區(qū)的寫入)Consumer-Transform-Producer模式下,因為消費者提交偏移量出現問題,導致在重復消費消息時,生產者重復生產消息。需要將這個模式下消費者提交偏移量操作和生成者一系列生成消息的操作封裝成一個原子操作。
為支持事務機制,KAFKA 引入了兩個新的組件:Transaction Coordinator 和 Transaction Log,其中 transaction coordinator 是運行在每個 kafka broker 上的一個模塊,是 kafka broker 進程承載的新功能之一(不是一個獨立的新的進程);而 transaction log 是 kakafa 的一個內部 topic;
1.2.2 事務的原子性
事務原子性是指 Producer 將多條消息作為一個事務批量發(fā)送,要么全部成功要么全部失敗。 引入了一個服務器端的模塊,名為Transaction Coordinator,用于管理 Producer 發(fā)送的消息的事務性。
該Transaction Coordinator維護Transaction Log,該 log 存于一個內部的 Topic 內。由于 Topic 數據具有持久性,因此事務的狀態(tài)也具有持久性。
Producer 并不直接讀寫Transaction Log,它與Transaction Coordinator通信,然后由Transaction Coordinator將該事務的狀態(tài)插入相應的Transaction Log。
Transaction Log的設計與Offset Log用于保存 Consumer 的 Offset 類似。
Kafka事務的回滾,并不是刪除已寫入的數據,而是將寫入數據的事務標記為 Rollback/Abort 從而在讀數據時過濾該數據。
1.2.3?拒絕僵尸實例
在分布式系統(tǒng)中,一個instance的宕機或失聯,集群往往會自動啟動一個新的實例來代替它的工作。此時若原實例恢復了,那么集群中就產生了兩個具有相同職責的實例,此時前一個instance就被稱為“僵尸實例(Zombie Instance)”。在Kafka中,兩個相同的producer同時處理消息并生產出重復的消息(read-process-write模式),這樣就嚴重違反了Exactly Once Processing的語義。這就是僵尸實例問題。
解決辦法:
kafka事務特性通過transaction-id屬性來解決僵尸實例問題。所有具有相同transaction-id的Producer都會被分配相同的pid,同時每一個Producer還會被分配一個遞增的epoch。Kafka收到事務提交請求時,如果檢查當前事務提交者的epoch不是最新的,那么就會拒絕該Producer的請求。從而達成拒絕僵尸實例的目標。
1.2.4 開啟事務的生產者和消費者
1)生產者:開啟了事務的生產者,生產的消息最終還是正常寫到目標 topic 中,但同時也會通過 transaction coordinator 使用兩階段提交協(xié)議,將事務狀態(tài)標記 transaction marker,也就是控制消息 controlBatch,寫到目標 topic 中,控制消息共有兩種類型 commit 和 abort,分別用來表征事務已經成功提交或已經被成功終止;
2)消費者:開啟了事務的消費者,如果配置讀隔離級別為 read-committed, 在內部會使用存儲在目標 topic-partition 中的事務控制消息,來過濾掉沒有提交的消息,包括回滾的消息和尚未提交的消息,從而確保只讀到已提交的事務的 message;
開啟了事務的消費者,過濾消息時,KAFKA consumer 不需要跟 transactional coordinator 進行 rpc 交互,因為 topic 中存儲的消息,包括正常的數據消息和控制消息,包含了足夠的元數據信息來支持消息過濾;
3)總結:當然 kakfa 的 producer 和 consumer 是解耦的,你也可以使用非 transactional consumer 來消費 transactional producer 生產的消息,此時目標 topic-partition 中的所有消息都會被返回,不會進行過濾,此時也就丟失了事務 ACID 的支持;
1.3 事務的api
對于Producer,需要設置transactional.id屬性,這個屬性的作用下文會提到。設置了transactional.id屬性后,enable.idempotence屬性會自動設置為true。
對于Consumer,需要設置isolation.level = read_committed,這樣Consumer只會讀取已經提交了事務的消息。另外,需要設置enable.auto.commit = false來關閉自動提交Offset功能。
1.生產者
/**
* 初始化事務
*/
public void initTransactions();
/**
* 開啟事務
*/
public void beginTransaction() throws ProducerFencedException ;
/**
* 在事務內提交已經消費的偏移量
*/
public void sendOffsetsToTransaction(Map
String consumerGroupId) throws ProducerFencedException ;
/**
* 提交事務
*/
public void commitTransaction() throws ProducerFencedException;
/**
* 丟棄事務
*/
public void abortTransaction() throws ProducerFencedException ;
2.Write-process-wirte
KafkaProducer producer = createKafkaProducer(
"bootstrap.servers", "localhost:9092",
"transactional.id”, “my-transactional-id");
producer.initTransactions();
producer.beginTransaction();
producer.send("outputTopic", "message1");
producer.send("outputTopic", "message2");
producer.commitTransaction();
3.Read-process-Write
KafkaProducer producer = createKafkaProducer(
"bootstrap.servers", "localhost:9092",
"transactional.id", "my-transactional-id");
KafkaConsumer consumer = createKafkaConsumer(
"bootstrap.servers", "localhost:9092",
"group.id", "my-group-id",
"isolation.level", "read_committed");
consumer.subscribe(singleton("inputTopic"));
producer.initTransactions();
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord record : records)
producer.send(producerRecord(“outputTopic”, record));
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
producer.commitTransaction();
}
1.4 事務的原理
圖中的 Transaction Coordinator 運行在 Kafka 服務端,下面簡稱 TC 服務。 __transaction_state 是 TC 服務持久化事務信息的 topic 名稱,下面簡稱事務 topic。 Producer 向 TC 服務發(fā)送的 commit 消息,下面簡稱事務提交消息。 TC 服務向分區(qū)發(fā)送的消息,下面簡稱事務結果消息。
一文讀懂 kafka 的事務機制 - 知乎
kafka之事務_kafka 事務_你的boy_Z的博客-CSDN博客
Kafka事務解析 - 知乎
https://www.cnblogs.com/xijiu/p/16917741.html
柚子快報邀請碼778899分享:分布式 kafka事務的詳解
推薦閱讀
本文內容根據網絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯系刪除。