欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)邀請(qǐng)碼778899分享:分布式 Kafka基礎(chǔ) (上)

柚子快報(bào)邀請(qǐng)碼778899分享:分布式 Kafka基礎(chǔ) (上)

http://yzkb.51969.com/

前言

各位清明 快樂(lè)呀,近期博主也是學(xué)習(xí)了一下kafka,以下是博主的一些學(xué)習(xí)筆記,希望對(duì)你有所幫助

前置知識(shí)

線程中的數(shù)據(jù)交互以及進(jìn)程中的數(shù)據(jù)交互

我們知道線程之間可以使用堆空間進(jìn)行數(shù)據(jù)交互的

但是如果發(fā)送方和接收方處理數(shù)據(jù)的效率差距過(guò)大,這里就會(huì)造成消息積壓的問(wèn)題,怎么處理呢?存入文件顯然是不可取的,因?yàn)檫@里文件的大小也是有上限的,所以我們加上一個(gè)中間件,也就是kafka

這里進(jìn)程呢?

進(jìn)程之間肯定是不可以使用共用內(nèi)存進(jìn)行交互的,這里就采用網(wǎng)絡(luò)傳輸?shù)姆绞竭M(jìn)行交互,因?yàn)樗麄兊膬?nèi)存都是獨(dú)立存在的,使用socket網(wǎng)絡(luò)傳輸即可

我們知道一個(gè)一般處理消息的不止一個(gè)消費(fèi)者,這樣直接讓消費(fèi)者和生產(chǎn)者進(jìn)行交互耦合度也就太高了,我們也引入了消息中間件來(lái)降低消息的耦合度吧

JMS Java Message Service

JMS包含了p2p和消息訂閱發(fā)布模型,基本上很多mq都是遵循這個(gè)模型的

我們kafka沒(méi)有加上mq的的后綴,他其實(shí)不是完全遵循這個(gè)模型

下面我們介紹一下這個(gè)模型

p2p 點(diǎn)對(duì)點(diǎn)模型

這里指的是一條消息只能被消費(fèi)者消費(fèi)一次,然后消費(fèi)者會(huì)給生產(chǎn)者一個(gè)反饋

sub/pub訂閱發(fā)布模型

生產(chǎn)者生產(chǎn)的消息會(huì)發(fā)送到對(duì)應(yīng)的topic,訂閱了這個(gè)topic的消費(fèi)者都可以消費(fèi)數(shù)據(jù),同樣的數(shù)據(jù)可以被不同的消費(fèi)者進(jìn)行消費(fèi)

注:本文基于的Windows的kafka進(jìn)行演示學(xué)習(xí),kafka一般部署在linux操作系統(tǒng)上

kafka的生產(chǎn)者消費(fèi)者模型

kafka在底層大量的使用生產(chǎn)者消費(fèi)者模型

并且為了保證數(shù)據(jù)的安全性,其還使用了日志文件進(jìn)行了數(shù)據(jù)的保存

下面我們通過(guò)一個(gè)簡(jiǎn)單的helloworld程序來(lái)感受一下

我們先啟動(dòng)zookeeper 再啟動(dòng)kafka即可

注意,先進(jìn)行修改 兩個(gè)配置文件,存放對(duì)應(yīng)的data

注:記得進(jìn)入對(duì)應(yīng)的文件夾,使用對(duì)應(yīng)的bat腳本文件

先演示一下單機(jī)

開(kāi)啟zookeeper腳本

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

開(kāi)啟kafka腳本

call bin/windows/kafka-server-start.bat config/server.properties

創(chuàng)建主題

查看主題

執(zhí)行經(jīng)典helloworld

注:啟動(dòng)完一定要先創(chuàng)建主題,主題是kafka一個(gè)基本的邏輯分類(lèi)單位,先開(kāi)啟zookeeper再開(kāi)啟kafka

如果這里kafka客戶端一閃而過(guò)啟動(dòng)失敗的情況,直接刪除data文件即可

maven項(xiàng)目簡(jiǎn)單搭建

引入依賴

org.apache.kafka

kafka-clients

3.6.1

注:在kafka中提供服務(wù)的節(jié)點(diǎn)就稱之為broker

producer代碼

package kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;

import java.util.Map;

public class testProducer {

public static void main(String[] args) {

//TODO 創(chuàng)建配置對(duì)象

Map configMap = new HashMap<>();

configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

//TODO 對(duì)生產(chǎn)的KV操作進(jìn)序列化

configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//TODO 創(chuàng)建生產(chǎn)者對(duì)象

//生產(chǎn)者對(duì)象需要確定泛型,是kv類(lèi)型的

KafkaProducer producer = new KafkaProducer<>(configMap);

//TODO 創(chuàng)建數(shù)據(jù)

//構(gòu)建數(shù)據(jù)時(shí),需要傳入三個(gè)參數(shù),主題,key,value

for (int i = 0; i < 10; i++) {

ProducerRecord record = new ProducerRecord<>("test", "key"+i,"hello kafka"+i);

//TODO 通過(guò)生產(chǎn)者對(duì)象將數(shù)據(jù)發(fā)送給kafka

producer.send(record);

}

//TODO 關(guān)閉生產(chǎn)者對(duì)象

producer.close();

}

}

consumer代碼

package kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;

import java.util.HashMap;

import java.util.Map;

public class testConsumer {

public static void main(String[] args) {

//TODO 創(chuàng)建消費(fèi)者對(duì)象

//消費(fèi)者也需要相應(yīng)的配置

Map consumerConfig = new HashMap<>();

consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

//反序列化

consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

//配置groupID

consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);

//TODO 訂閱主題

consumer.subscribe(Collections.singletonList("test"));

//TODO 從kafka的主題中獲取數(shù)據(jù)

//消費(fèi)者從kafka拉取數(shù)據(jù) 不是推送的概念

while(true) {

ConsumerRecords dates = consumer.poll(100);

for (ConsumerRecord date : dates) {

System.out.println(date);

}

}

//TODO 關(guān)閉消費(fèi)者對(duì)象

//consumer.close();

}

}

先啟動(dòng)consumer再啟動(dòng)producer,讓我們?cè)诳梢暬ぞ呱喜榭匆幌滦畔⑹欠翊嬖诹?/p>

這里使用的是kafkatool

執(zhí)行完成就可以發(fā)現(xiàn)數(shù)據(jù)已經(jīng)存在了

?Kafka系統(tǒng)架構(gòu)以及核心組件

我們都知道kafka肯定不是只有一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者呀

當(dāng)這里的數(shù)據(jù)頻繁生產(chǎn)消費(fèi)就可能造成IO熱點(diǎn)問(wèn)題

最后這個(gè)節(jié)點(diǎn)可能就成為分布式系統(tǒng)的性能瓶頸,一旦節(jié)點(diǎn)掛了,就可能造成數(shù)據(jù)丟失

tips:這里的掛了可能只是網(wǎng)絡(luò)不穩(wěn)定,資源耗盡等問(wèn)題導(dǎo)致的長(zhǎng)時(shí)間連接不上

解決方案

橫向擴(kuò)展和縱向擴(kuò)展

橫向擴(kuò)展:使用更快的網(wǎng)絡(luò),更大的磁盤(pán)...無(wú)法根本解決問(wèn)題

縱向擴(kuò)展:使用集群的方式,也是kafka的解決方案

這還沒(méi)有結(jié)束,光增加節(jié)點(diǎn)是沒(méi)用的,生產(chǎn)和消費(fèi)請(qǐng)求還是指向同一個(gè)節(jié)點(diǎn),我們需要將這里的數(shù)據(jù)分散到各個(gè)節(jié)點(diǎn),實(shí)現(xiàn)同一個(gè)主題在不同的broker中

區(qū)分不同的數(shù)據(jù),加上編號(hào)就稱之為分區(qū),這也是kafka物理上的存儲(chǔ)單位?

例如 partition-0

一個(gè)主題可以有多個(gè)分區(qū),分散在不同的broker中

但是每個(gè)消費(fèi)者也不能只消費(fèi)一部分?jǐn)?shù)據(jù)呀,每個(gè)消費(fèi)者向每個(gè)分區(qū)發(fā)送請(qǐng)求,這里效率也是很低的,所以又提出了消費(fèi)組的概念

并且為了數(shù)據(jù)的安全考慮,也提出了將數(shù)據(jù)進(jìn)行備份的方案,但是并不在自己的節(jié)點(diǎn)進(jìn)行備份,因?yàn)樵谧约旱墓?jié)點(diǎn)進(jìn)行備份的話,自己掛了,備份也沒(méi)了,所以這里是在其他節(jié)點(diǎn)保存?zhèn)浞菸募?稱之為foller副本,kafka中備份統(tǒng)稱為副本,主文件叫做leader副本,只有l(wèi)eader副本可以讀寫(xiě),foller副本只負(fù)責(zé)備份.

基礎(chǔ)組件

每一個(gè)kafka節(jié)點(diǎn)中都包含很多個(gè)組件,下面我們來(lái)介紹一下經(jīng)典的幾個(gè)組件

首先就是我們的Controller了,在多個(gè)節(jié)點(diǎn)中我們得選舉出一個(gè)管理者

這里管理者選舉的操作就交給我們的Zookeeper了

這里的選舉也很簡(jiǎn)單粗暴,哪個(gè)節(jié)點(diǎn)先和他建立連接,他就是Controller

Controller的備份

1.采用備份的方式

2.升級(jí),每個(gè)節(jié)點(diǎn)都能做備份

這里假設(shè)一個(gè)節(jié)點(diǎn)掛了,可以通過(guò)Zookeeper的一個(gè)選舉功能在選舉出新的Controller?

broker架構(gòu)

為啥生產(chǎn)者和消費(fèi)者指向同一個(gè)broker呢

因?yàn)閿?shù)據(jù)是有主題的,主題是有分區(qū)的,分區(qū)是有副本的,一個(gè)叫l(wèi)eader,一個(gè)叫foller

指向同一個(gè)broker是因?yàn)樗麑?duì)應(yīng)的分區(qū)是leader副本,分區(qū)管理器會(huì)將其同步到文件

集群部署

我們知道kafka一般是以集群方式出現(xiàn)

為了模擬,我們也部署一下集群

這時(shí)候解壓三個(gè)kafka到不同文件夾,修改data配置以及端口即可

可以設(shè)置為9091 9092 9093??

注意Zookeeper也得配置

可以寫(xiě)批處理腳本,這樣運(yùn)行起來(lái)更加方便

出現(xiàn)以下問(wèn)題就將其放在根目錄下或者將文件夾名改短

可視化工具創(chuàng)建主題

注:這里副本數(shù)量超過(guò)節(jié)點(diǎn)數(shù)量不會(huì)創(chuàng)建成功,因?yàn)橐粋€(gè)節(jié)點(diǎn)放多個(gè)副本是無(wú)意義的

Zookeeper的作用

我們簡(jiǎn)述一下Zookeeper的作用

1.Controller的選舉

選舉規(guī)則就是比較隨意,第一個(gè)建立和zookeeper建立連接的broker節(jié)點(diǎn)就是 controller 然后其他的節(jié)點(diǎn)來(lái)建立連接的時(shí)候也想創(chuàng)建,但是controller已經(jīng)有了,之后的節(jié)點(diǎn)就是放一個(gè)監(jiān)聽(tīng)器,假設(shè)現(xiàn)在的Controller掛了,這個(gè)監(jiān)聽(tīng)器就起作用了,從其余的broker中選舉出新的broker

2.對(duì)節(jié)點(diǎn)的監(jiān)聽(tīng)

Znode節(jié)點(diǎn)有個(gè)監(jiān)聽(tīng)功能 可以使用kafka對(duì)節(jié)點(diǎn)進(jìn)行監(jiān)聽(tīng)到節(jié)點(diǎn)的變化 數(shù)據(jù)的變化 連接超時(shí)... 監(jiān)聽(tīng)到以后馬上通知kafka進(jìn)行對(duì)應(yīng)的處理

Controller和Broker之間的通信

第一個(gè)broker啟動(dòng)的流程 1.注冊(cè)broker節(jié)點(diǎn) 監(jiān)聽(tīng)controller節(jié)點(diǎn) 2.注冊(cè)controller節(jié)點(diǎn) 選舉成為controller,監(jiān)聽(tīng)/broker/ids節(jié)點(diǎn) 因?yàn)閎roker啟動(dòng)就會(huì)創(chuàng)建ids,所以這里的監(jiān)聽(tīng)主要就是看看ids的變化,是否有新的節(jié)點(diǎn)創(chuàng)建了 第二個(gè)節(jié)點(diǎn)加入之后監(jiān)聽(tīng)器就知道了,會(huì)通知broker1集群的變化 然后在第二個(gè)broker進(jìn)來(lái)之后還會(huì)和第一個(gè)節(jié)點(diǎn)連接 傳輸一些集群的信息等等 但是第三個(gè)節(jié)點(diǎn)連接上來(lái)之后,controller會(huì)給兩個(gè)broker都發(fā)送相關(guān)的集群信息

這也就是說(shuō),每當(dāng)有節(jié)點(diǎn)連上了之后,controller就會(huì)向各個(gè)節(jié)點(diǎn)發(fā)送對(duì)應(yīng)的集群信息

Broker組件

主要是包含日志組件? 網(wǎng)絡(luò)客戶端? 副本管理器 controller信息? kafka apis(負(fù)責(zé)處理數(shù)據(jù))? ? ? ? ?Zookeeper的客戶端等等

手動(dòng)創(chuàng)建主題

我們之前的主題使用的都是默認(rèn)參數(shù)自動(dòng)創(chuàng)建的,我們?nèi)绻胄薷钠渲械膮?shù)就得手動(dòng)創(chuàng)建對(duì)應(yīng)的admin管理員對(duì)象 從而對(duì)他的副本信息分區(qū)信息進(jìn)行設(shè)置

import org.apache.kafka.clients.admin.Admin;

import org.apache.kafka.clients.admin.AdminClientConfig;

import org.apache.kafka.clients.admin.CreateTopicsResult;

import org.apache.kafka.clients.admin.NewTopic;

import java.util.Arrays;

import java.util.HashMap;

import java.util.Map;

public class AdminTopicTest {

public static void main(String[] args) {

Map configs = new HashMap<>();

configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

//TODO 創(chuàng)建管理員

Admin admin = Admin.create(configs);

//TODO 創(chuàng)建主題

//第一個(gè)參數(shù)是主題名

//第二個(gè)參數(shù)是分區(qū)的數(shù)量 int

//第三個(gè)參數(shù)是副本的因子(本質(zhì)是數(shù)量) short

String topicName = "test1";

int partitionCount = 1;

short replicationCount = 1;

NewTopic topic = new NewTopic(topicName,partitionCount,replicationCount);

String topicName2 = "test2";

int partitionCount2 = 2;

short replicationCount2 = 2;

NewTopic topic2 = new NewTopic(topicName2,partitionCount2,replicationCount2);

CreateTopicsResult result = admin.createTopics(

Arrays.asList(topic, topic2)

);

//TODO 關(guān)閉管理者對(duì)象

admin.close();

}

}

?副本分配策略

主題只是邏輯上的分類(lèi),只有分區(qū)才能在物理文件上以及存儲(chǔ)中有所體現(xiàn)

我們知道我們是使用多個(gè)副本冗余來(lái)提高數(shù)據(jù)的可靠性的

那么副本在節(jié)點(diǎn)中又是如何分配的呢,咱們接下來(lái)慢慢說(shuō)

先說(shuō)理想的情況

我們知道副本也分為leader和follower

我們這里的均衡指的是leader的分布應(yīng)該是均勻的

我們先說(shuō)理想情況下

我們希望每個(gè)節(jié)點(diǎn)的leader數(shù)量都是相近的

實(shí)際上kafka并不是這樣的

因?yàn)楦北镜膭?chuàng)建是有順序的,我們無(wú)法再一開(kāi)始就預(yù)測(cè)浩好這里的副本分配

kafka是采用一個(gè)簡(jiǎn)單的分配算法來(lái)進(jìn)行的副本分配

例子

注:我們也可以自己手動(dòng)分配

一個(gè)重要的名詞? ISR? in-sync-Replication? ?就是同步副本列表的意思

主題創(chuàng)建流程

大概就是先問(wèn)一下controller在哪,通過(guò)controller來(lái)創(chuàng)建topic

但是底層有很多生產(chǎn)者消費(fèi)者模型

這里的具體操作由apis接口來(lái)實(shí)現(xiàn)

生產(chǎn)數(shù)據(jù)

一般主題都是提前創(chuàng)建好的,如果使用自動(dòng)創(chuàng)建的話很可能導(dǎo)致IO熱點(diǎn)問(wèn)題

因?yàn)楦北镜膌eader都在同一個(gè)節(jié)點(diǎn)上

具體流程如下圖

生產(chǎn)者數(shù)據(jù)先通過(guò)攔截器的攔截,然后去元數(shù)據(jù)區(qū)獲取controller的信息,然后進(jìn)行序列化(因?yàn)槭峭ㄟ^(guò)網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)),在通過(guò)分區(qū)器確定分區(qū),最后加入緩沖區(qū)等待發(fā)送

注:攔截器是對(duì)數(shù)據(jù)進(jìn)行了一些規(guī)范化的處理,但是出現(xiàn)錯(cuò)誤之后不會(huì)導(dǎo)致程序的停止,不影響數(shù)據(jù)的發(fā)送,捕捉到異常也不會(huì)進(jìn)行處理

然后通過(guò)發(fā)送線程繼續(xù)發(fā)送數(shù)據(jù),這里的在途請(qǐng)求緩沖區(qū)的大小表示一個(gè)節(jié)點(diǎn)在同一時(shí)間最多處理的請(qǐng)求數(shù)量,默認(rèn)是5,這是經(jīng)過(guò)壓力測(cè)試的,這樣性能最優(yōu)

分區(qū)器

我們剛剛看到數(shù)據(jù)會(huì)經(jīng)過(guò)分區(qū)器處理來(lái)知道放到哪個(gè)分區(qū),下面我們介紹一下分區(qū)器是怎么工作的,分區(qū)器是從元數(shù)據(jù)區(qū)獲取到主題信息再開(kāi)始計(jì)算分區(qū)的,注意這里根據(jù)的主題信息直接指定分區(qū)的話是不會(huì)做校驗(yàn)而是直接使用的

算法

分區(qū)算法,將key使用散列算法后和分區(qū)數(shù)進(jìn)行一次取模運(yùn)算,在寫(xiě)入數(shù)據(jù)收集器的時(shí)候,就需要進(jìn)行處理了,如果當(dāng)前主題分區(qū)是未知分區(qū),就會(huì)根據(jù)當(dāng)前主題分區(qū)的負(fù)載情況動(dòng)態(tài)進(jìn)行分區(qū)(粘性分區(qū)策略)如果當(dāng)前發(fā)送的時(shí)候沒(méi)有分區(qū)負(fù)載情況,這時(shí)候就是隨機(jī)選擇的,選擇以后就盡可能向這里添加,超過(guò)閾值就會(huì)切換另外一個(gè)分區(qū),閾值默認(rèn)是16k,后面不為空之后就會(huì)根據(jù)每個(gè)分區(qū)的負(fù)載情況生成一個(gè)隨機(jī)的權(quán)重,然后通過(guò)一個(gè)二分查找找到一個(gè)和這個(gè)值相近的,然后算出來(lái)分區(qū)編號(hào)

緩沖區(qū)

緩沖區(qū)中對(duì)數(shù)據(jù)的追加是只要批次大小足夠,沒(méi)到達(dá)閾值,直接向后追加即可

批次對(duì)象空間不足 將滿了的批次對(duì)象鎖定并關(guān)閉,等著sender線程來(lái)拿,然后重新開(kāi)一個(gè)批次對(duì)象來(lái)追加這里的數(shù)據(jù) 數(shù)據(jù)是可以超過(guò)16k的,比如60k的數(shù)據(jù),直接裝,然后關(guān)閉準(zhǔn)備發(fā)車(chē),是不可以拆開(kāi)的

sender發(fā)送線程

會(huì)將符合發(fā)送條件的數(shù)據(jù)重新進(jìn)行整合,前面是因?yàn)橄嗤黝}的不同分區(qū)可能在不同的broker中,但是不同主題的分區(qū)可能在相同的broker中,用topic進(jìn)行區(qū)分效率更高一點(diǎn)

批次對(duì)象到達(dá)大小或者是時(shí)間閾值之后就會(huì)被發(fā)送

應(yīng)答機(jī)制 ACKS

本質(zhì)上是使用異步的方式

發(fā)送數(shù)據(jù)無(wú)需等待應(yīng)答以后再繼續(xù)發(fā)送

這樣數(shù)據(jù)的發(fā)送效率高了,但是安全性無(wú)法保證

Kafka就面對(duì)不同的場(chǎng)景給出了三個(gè)ACKS處理等級(jí)

分別是 0 1? all

0就是優(yōu)先考慮效率

all就是優(yōu)先考慮數(shù)據(jù)的安全性

1就是兩者之間的折中考慮

ACKS = 0

表示只是將數(shù)據(jù)放到網(wǎng)絡(luò)中了,根本不關(guān)心其是否發(fā)送完成,直到放到網(wǎng)絡(luò)中就給main線程發(fā)送一個(gè)應(yīng)答

ACKS等級(jí)為1的時(shí)候是需要數(shù)據(jù)在leader中進(jìn)行保存到文件中之后才能應(yīng)答

all等級(jí)是等待數(shù)據(jù)進(jìn)行備份之后才進(jìn)行對(duì)應(yīng)的應(yīng)答

retry重試機(jī)制

我們知道數(shù)據(jù)既然是在網(wǎng)絡(luò)中傳輸?shù)?那么數(shù)據(jù)丟包是很正常的,假設(shè)網(wǎng)絡(luò)不穩(wěn)定等等情況就很容易導(dǎo)致數(shù)據(jù)的丟包等等

我們這時(shí)候和tcp協(xié)議一樣定義了數(shù)據(jù)的重傳機(jī)制

只要主線程沒(méi)有收到acks,到達(dá)一定的超時(shí)時(shí)間,這時(shí)候就會(huì)將數(shù)據(jù)再次放回緩沖區(qū)重新進(jìn)行一次發(fā)送

但是這也會(huì)導(dǎo)致一定的問(wèn)題,比如數(shù)據(jù)重復(fù)多次或者是數(shù)據(jù)亂序問(wèn)題

好處是讓數(shù)據(jù)更安全,但是也有壞處

數(shù)據(jù)重復(fù):

假設(shè)這里leader寫(xiě)入了磁盤(pán)了,但是傳ack的時(shí)候網(wǎng)絡(luò)不穩(wěn)定,沒(méi)發(fā)成,這里就會(huì)再傳一次 這里數(shù)據(jù)就在文件中放了兩次

數(shù)據(jù)亂序:

還有一個(gè)問(wèn)題就是數(shù)據(jù)的順序問(wèn)題,發(fā)生順序是 a b c 但是可能 a發(fā)送失敗重發(fā)了 結(jié)果就是b c a 的順序了

這在某種情況下不是我們想看到的,于是我們又引入了冪等性操作和事務(wù)的概念

冪等性

冪等性要求數(shù)據(jù)的ACKS等級(jí)一定是all或者-1? (這倆等級(jí)一個(gè)意思),并且必須開(kāi)啟retry ,并且要求在途請(qǐng)求緩沖區(qū)的數(shù)量必須小于等于5

實(shí)現(xiàn)?

就是給數(shù)據(jù)標(biāo)上生產(chǎn)者編號(hào),標(biāo)上數(shù)據(jù)序號(hào),但是注意這里的冪等性不可以跨越客戶端或者是跨越分區(qū)來(lái)起作用

這里的冪等性只是在同一個(gè)分區(qū)內(nèi)的冪等

數(shù)據(jù)在發(fā)送給Kafka的時(shí)候,kafka會(huì)記錄生產(chǎn)者的狀態(tài)

重復(fù)是靠在加入在途數(shù)據(jù)緩沖區(qū)的時(shí)候判斷一下contains

有序是按照序號(hào)來(lái)的,下一個(gè)加入的數(shù)據(jù)必須大于當(dāng)前的最后一個(gè)數(shù)據(jù)

缺陷:

只能保證一個(gè)分區(qū)的數(shù)據(jù)是有序且不重復(fù)的

但是如果這時(shí)候生產(chǎn)者重啟了,此時(shí)仍然會(huì)導(dǎo)致數(shù)據(jù)的重復(fù)

這就要通過(guò)下面的事務(wù)來(lái)完成對(duì)這個(gè)缺點(diǎn)的補(bǔ)充了

事務(wù)

這里的事務(wù)是基于冪等性的,和數(shù)據(jù)庫(kù)中的事務(wù)完全不是一個(gè)意思

基本原理是保證生產(chǎn)者id重啟前后不會(huì)改變

執(zhí)行順序如下

注:事務(wù)這里的發(fā)送數(shù)據(jù)不是通過(guò)send方法進(jìn)行發(fā)送,而是commit才會(huì)發(fā)送,send只是將數(shù)據(jù)放到緩沖區(qū)

柚子快報(bào)邀請(qǐng)碼778899分享:分布式 Kafka基礎(chǔ) (上)

http://yzkb.51969.com/

參考閱讀

評(píng)論可見(jiàn),查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19240922.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問(wèn)

文章目錄