柚子快報邀請碼778899分享:Kafka—ISR機(jī)制
ISR機(jī)制 Kafka 中的 ISR(In-Sync Replicas)機(jī)制是一種用于確保數(shù)據(jù)可靠性和一致性的重要機(jī)制。ISR 是一組副本,它包括分區(qū)的領(lǐng)導(dǎo)者(Leader)和追隨者(Follower)副本,這些副本與領(lǐng)導(dǎo)者保持?jǐn)?shù)據(jù)同步。
ISR 關(guān)鍵概念
領(lǐng)導(dǎo)者和追隨者:每個分區(qū)有一個領(lǐng)導(dǎo)者和零個或多個追隨者。領(lǐng)導(dǎo)者負(fù)責(zé)處理客戶端的寫請求,而追隨者主要用于數(shù)據(jù)復(fù)制。
ISR 集合:ISR 集合是分區(qū)領(lǐng)導(dǎo)者的一組追隨者副本,它們與領(lǐng)導(dǎo)者保持?jǐn)?shù)據(jù)同步。只有在 ISR 集合中的追隨者副本可以參與數(shù)據(jù)的寫入和讀取操作。 數(shù)據(jù)復(fù)制:領(lǐng)導(dǎo)者將消息寫入其本地日志,并定期將這些消息發(fā)送給 ISR 集合中的追隨者。追隨者接收消息后,將其寫入本地日志,以保持?jǐn)?shù)據(jù)同步。 Leader Epoch 和 Log Start Offset:ISR 集合中的每個追隨者都維護(hù)了領(lǐng)導(dǎo)者的日志信息,包括領(lǐng)導(dǎo)者的 Leader Epoch 和 Log Start Offset。這些信息用于確保數(shù)據(jù)的正確復(fù)制和同步。 數(shù)據(jù)一致性:只有在 ISR 集合中的所有追隨者都成功復(fù)制了一條消息后,領(lǐng)導(dǎo)者才會將該消息標(biāo)記為已提交,確保數(shù)據(jù)的一致性。 故障處理:如果某個追隨者發(fā)生故障或者追趕進(jìn)度過慢,那么該追隨者可能會被從 ISR 集合中移除。這有助于保持?jǐn)?shù)據(jù)的可靠性和避免影響性能。
其中,需要注意的的概念:
分區(qū)中的所有副本統(tǒng)稱為AR(Assigned Replicas)。 所有Leader副本加上和Leader副本保持同步的Follower副本組成ISR(In-Sync Replicas)。 所有沒有保持同步的Follower副本組成OSR(Out-of-Sync Replicas)。 AR = ISR + OSR。正常情況下,所有Follower副本都應(yīng)該和Leader副本一致,即AR=ISR。 當(dāng)Leader故障時,在ISR集合中的Follower才有資格被選舉為新的Leader。
HW和LEO 在 Kafka 中,HW(High Watermark)和 LEO(Log End Offset)是與數(shù)據(jù)復(fù)制和消費(fèi)有關(guān)的兩個重要概念。
HW(High Watermark):HW 是指在分區(qū)中,已經(jīng)被所有追隨者(Follower)副本復(fù)制的消息的位置。HW 是每個分區(qū)的屬性,它表示已經(jīng)提交的消息。只有在 HW 之前的消息才被認(rèn)為是已經(jīng)提交的,這些消息已經(jīng)被寫入分區(qū)的所有追隨者副本,并且被認(rèn)為是安全的,不會丟失。HW 是為了確保數(shù)據(jù)一致性和可靠性而引入的。
LEO(Log End Offset):LEO 是指在分區(qū)中當(dāng)前最新消息的位置。LEO 表示分區(qū)日志中的最后一條消息的偏移量。LEO 包括已經(jīng)被寫入但尚未被所有追隨者副本復(fù)制的消息,以及正在等待被寫入的消息。LEO 是一個動態(tài)的屬性,它會隨著新消息的寫入而逐漸增加。
HW 和 LEO 之間的關(guān)系非常重要,它們可以幫助確保數(shù)據(jù)的可靠性和一致性:
HW 之前的消息是已經(jīng)提交的消息,它們在數(shù)據(jù)復(fù)制中是安全的,不會丟失。 LEO 之前的消息是已經(jīng)寫入但尚未被所有追隨者副本復(fù)制的消息。這些消息可能會在 HW 之前被提交,也可能會在之后被提交。 一旦 HW 追趕上 LEO,表示所有的消息都已經(jīng)提交,分區(qū)的數(shù)據(jù)一致性得到了保障。
Kafka的消息同步流程:
初始狀態(tài),HW和LEO在同一位置。消費(fèi)者可以讀取的有效消息為0,1,2,3. 消息寫入Leader,LEO位置改變。Follower進(jìn)行同步。
3. Follower同步進(jìn)度決定HW位置,消費(fèi)者可讀的有效消息0,1,2,3,4。
完成同步,消費(fèi)者可讀的有效消息0,1,2,3,4,5,6。
可以看出,Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純異步復(fù)制。
同步復(fù)制要求所有Follower副本都復(fù)制完,太影響性能了。 異步復(fù)制只要數(shù)據(jù)被寫入Leader副本就認(rèn)為提交成功,在這種情況下,如果Leader宕機(jī)時候Follower還是落后于Leader就會造成數(shù)據(jù)丟失。
而Kafka使用的ISR機(jī)制則有效地權(quán)衡了數(shù)據(jù)可靠性和性能之間的關(guān)系。
Java使用Kafka通信
以下是 Kafka 生產(chǎn)者和消費(fèi)者的簡單示例,使用 Kafka 的 Java 客戶端庫(Kafka Producer 和 Kafka Consumer)來創(chuàng)建一個基本的消息傳遞示例。 Kafka 生產(chǎn)者示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
?
public class KafkaProducerExample {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092"; // Kafka 服務(wù)器地址
String topic = "my-topic"; // Kafka 主題名稱
?
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
?
Producer
?
// 發(fā)送消息
producer.send(new ProducerRecord<>(topic, "key", "Hello, Kafka!"), (metadata, exception) -> {
if (exception == null) {
System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
});
?
producer.close();
}
}
Kafka 消費(fèi)者示例
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.time.Duration;
import java.util.Collections;
?
public class KafkaConsumerExample {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092"; // Kafka 服務(wù)器地址
String groupId = "my-group"; // 消費(fèi)者組 ID
String topic = "my-topic"; // Kafka 主題名稱
?
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("group.id", groupId);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
?
KafkaConsumer
consumer.subscribe(Collections.singletonList(topic));
?
while (true) {
ConsumerRecords
for (ConsumerRecord
System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
}
}
}
}
柚子快報邀請碼778899分享:Kafka—ISR機(jī)制
文章來源
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。