柚子快報(bào)激活碼778899分享:學(xué)習(xí)筆記 | Kafka
柚子快報(bào)激活碼778899分享:學(xué)習(xí)筆記 | Kafka
一、概述
定義
1、Kafka傳統(tǒng)定義:Kafka 是一個(gè)分布式的基于 發(fā)布/訂閱模式 的消息隊(duì)列(Message Queue) ,主要應(yīng)用與大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。
2、發(fā)布/訂閱:消息的發(fā)送者不會(huì)將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息分為不同的類別,訂閱者只接受自己感興趣的消息。
3、Kafka 最新定義:Kafka是一個(gè)開源的 分布式事件流平臺 (Event Streaming Platfrom),被數(shù)千家公司用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用。
消息隊(duì)列的應(yīng)用場景
傳統(tǒng)的消息隊(duì)列主要應(yīng)用場景包括: 緩存/削峰、解耦和異步通信。
緩存/削峰
所有數(shù)據(jù)可以全部緩存到消息隊(duì)列,服務(wù)器可以根據(jù)自己處理的性能按一定的頻率去消息隊(duì)列中取。
解耦
減少服務(wù)之間的直接調(diào)用,由消息隊(duì)列充當(dāng)中間者。
異步通信
一個(gè)業(yè)務(wù)可以將優(yōu)化體驗(yàn)(發(fā)短信)的動(dòng)作放到消息隊(duì)列中,由專門的服務(wù)去處理,達(dá)到快速響應(yīng)上游。
消息隊(duì)列的倆種模式
1)點(diǎn)對點(diǎn)模式
消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后清除數(shù)據(jù)。
2)發(fā)布/訂閱模式
一個(gè)隊(duì)列可以有多個(gè)topic主題。(topic對消息進(jìn)行分類,消費(fèi)者可以自己需求拿消息)消費(fèi)者消費(fèi)數(shù)據(jù)之后,不刪除數(shù)據(jù)。每個(gè)消費(fèi)者相互獨(dú)立,都可以拿到消費(fèi)數(shù)據(jù)。
Kafka的基礎(chǔ)架構(gòu)
1、為方便擴(kuò)展,并提高吞吐量,一個(gè) Topic 分為多個(gè) partition(分區(qū))
2、配合分區(qū)的設(shè)計(jì),提出了消費(fèi)者組的概念,組內(nèi)每個(gè)消費(fèi)者并行消費(fèi),一個(gè)分區(qū)只能讓一個(gè)消費(fèi)者消費(fèi)。
3、為了提高可用性,為每個(gè) partition 增加諾干副本進(jìn)行備份(分為leader 和 follower)消費(fèi)者只找learder,當(dāng)leader掛掉的時(shí)候,follower符合條件時(shí)會(huì)變成leader。
4、zookerper存儲節(jié)點(diǎn)信息,有哪些副本。
二、入門
Kafka的基本命令
Topic命令
查看有多少主題
kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --list
新增主題
kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --topic first --create --partitions 1 --replication-factor 3
查看主題詳情
kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --topic first --describe
修改主題
只能加不能減
kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --topic first --alter --partitions 3
命令行操作
創(chuàng)建一個(gè)生產(chǎn)者
kafka-console-producer.sh --bootstrap-server 192.168.204.10:9092 --topic second
創(chuàng)建一個(gè)消費(fèi)者
kafka-console-consumer.sh --bootstrap-server 192.168.204.10:9092 --topic second
可以查看到歷史數(shù)據(jù)
kafka-console-consumer.sh --bootstrap-server 192.168.204.10:9092 --topic second --from-beginning
三、生產(chǎn)者
原理
在消息發(fā)送的過程中,涉及到了倆個(gè)線程 -- main 和 Sender。在main線程中創(chuàng)建了 一個(gè)雙端隊(duì)列 RecordAccumulator 。main線程將消息發(fā)送給RecordAccumulator ,Sender 線程不斷從RecordAccumulator 中拉取消息發(fā)送給Kafka Broker。
異步發(fā)送
當(dāng)main線程發(fā)送到RecordAccumulator之后就結(jié)束了,不管接下去的操作。
示例代碼:
//配置參數(shù)
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//創(chuàng)建KafkaProducer
KafkaProducer
kafkaProducer.send(new ProducerRecord<>("second","hello"));
//釋放資源
kafkaProducer.close();
回調(diào)異步發(fā)送
相對于異步發(fā)送,就是多了一個(gè)發(fā)送成功之后處理的函數(shù)。
示例代碼:
//配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//創(chuàng)客KafkaProducer
KafkaProducer
kafkaProducer.send(new ProducerRecord<>("second", "hello"), (recordMetadata, e) -> {
System.out.println(recordMetadata.toString());
System.out.println("send success");
});
//釋放資源
kafkaProducer.close();
同步發(fā)送
同步發(fā)送就是main線程需要等sender線程將雙端隊(duì)列中的數(shù)據(jù)發(fā)送出去才能繼續(xù)往下面操作。
示例代碼:
//配置參數(shù)
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//創(chuàng)建KafkaProducer
KafkaProducer
try {
kafkaProducer.send(new ProducerRecord<>("second","hello")).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//釋放資源
kafkaProducer.close();
分區(qū)
Kafka分區(qū)好處
1、便于合理使用存儲資源,每個(gè)Partition 在一個(gè)Broker上存儲,可以把海量數(shù)據(jù)按照分區(qū)切割成一塊一塊存儲在多臺Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果。
2、提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù)。
分區(qū)策略
自定義分區(qū)器
1、定義自己的分區(qū)器
package cn.swj.kafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @Author suweijie
* @Date 2023/8/30 21:40
* @Description: TODO
* @Version 1.0
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
String msg = o1.toString();
if(msg.contains("suweijie")) {
return 1;
}
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map
}
}
2、添加配置
//配置參數(shù)
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//自定義分區(qū)器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName())
//創(chuàng)建KafkaProducer
KafkaProducer
try {
kafkaProducer.send(new ProducerRecord<>("second","hello")).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//釋放資源
kafkaProducer.close();
提高生產(chǎn)者的吞吐量
batch.size: 批次的大小默認(rèn)是16k(16384b) ,但是這個(gè)參數(shù)要跟linger.ms 配合才有用
linger.ms: 等待時(shí)間,修改為 5-100ms ,修改這個(gè)會(huì)造成數(shù)據(jù)的延遲。
RecordAccumulator: 雙端隊(duì)列的緩存區(qū)大小,修改為64m (33554432b)
compression.type : 壓縮snappy, none(默認(rèn))、gzip、snappy(用的比較多)、lz4、zstd
最佳實(shí)踐:
batch.size = 32768
linger.ms = 5
buffer.memory = 33554432
compression.type = snappy
數(shù)據(jù)可靠性
應(yīng)答ACKS
0: 生產(chǎn)者發(fā)過來的數(shù)據(jù),不需要等待數(shù)據(jù)落盤應(yīng)答。1: 生產(chǎn)者發(fā)過來的數(shù)據(jù),需要等待Leader收到之后應(yīng)答。-1(all): 生產(chǎn)者發(fā)過來的數(shù)據(jù),需要等Leader+ 和 isr 隊(duì)列里面所有的節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。-1 和 all等價(jià)。
spring:
kafka:
bootstrap-servers: 192.168.204.10:9092,192.168.204.10:9093,192.168.204.10:9094
consumer:
group-id: 1
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
producer:
acks: -1 #ack機(jī)制 0 1 -1
batch-size: 32768 #批次大小
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
compression-type: snappy #數(shù)據(jù)壓縮
retries: 5 #重試次數(shù)
buffer-memory: 33554432 #雙端隊(duì)列的緩沖區(qū)大小
linger-ms: 5 # sender 等待時(shí)間
數(shù)據(jù)重復(fù)
冪等性特性
配置:
enable:
idempotence: true #開啟冪等性 默認(rèn)開啟
但是Kafka掛掉之后會(huì)重新生成一個(gè)PID,所以也是有可能會(huì)產(chǎn)生重復(fù)數(shù)據(jù)。
生產(chǎn)者事務(wù)
開啟事務(wù)、必須得開啟冪等性
示例代碼:
private void transaction() {
//配置參數(shù)
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.LINGER_MS_CONFIG,5); //sender 發(fā)送的等待時(shí)間 ,當(dāng)達(dá)到這個(gè)時(shí)間的時(shí)候Sender 會(huì)直接發(fā)
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); //開啟冪等性,默認(rèn)開啟
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); //設(shè)置雙端隊(duì)列的大小 64m
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32768); //批次的大小 32k ,當(dāng)批次達(dá)到這個(gè)大小的時(shí)候,Sender會(huì)直接發(fā)送
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //數(shù)據(jù)的壓縮方式
properties.put(ProducerConfig.RETRIES_CONFIG,5); //發(fā)送失敗的重試次數(shù)
properties.put(ProducerConfig.ACKS_CONFIG,-1); // acks的方式 -1 當(dāng)leader 收到并且和isr 隊(duì)列里面所有的節(jié)點(diǎn)同步才應(yīng)答。
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"123"); //事務(wù)唯一id
//自定義分區(qū)器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());
//創(chuàng)建KafkaProducer
KafkaProducer
kafkaProducer.initTransactions(); //初始化事務(wù)
kafkaProducer.beginTransaction(); //開啟事務(wù)
try {
kafkaProducer.send(new ProducerRecord<>("second","hello"));
kafkaProducer.commitTransaction(); //事務(wù)
} catch (Exception e) {
e.printStackTrace();
kafkaProducer.abortTransaction();
}
//釋放資源
kafkaProducer.close();
}
數(shù)據(jù)有序
同分區(qū)內(nèi)消費(fèi)者可以實(shí)現(xiàn)數(shù)據(jù)的有序消費(fèi),不同分區(qū)內(nèi)消費(fèi)者如何實(shí)現(xiàn)有序消費(fèi)?TODO
數(shù)據(jù)亂序問題
產(chǎn)生的原因:
1、默認(rèn) broker 最多緩存5個(gè)請求
2、當(dāng)sender一直在發(fā)送數(shù)據(jù)的時(shí)候,當(dāng)有一條數(shù)據(jù)發(fā)送失敗需要返回雙端隊(duì)列進(jìn)行重發(fā),就會(huì)產(chǎn)生數(shù)據(jù)亂序的問題。
解決方案:
1) kafka 在 1.x 版本之前確保單分區(qū)下數(shù)據(jù)有序需要增加以下配置:
max.in.flight.requests.per.connection = 1
1) kafka在 1.x 以及之后的版本確保單分區(qū)下的額數(shù)據(jù)有序,條件如下:
(1) 未開啟冪等性
max.in.flight.requests.per.connection 設(shè)置為1
(2)開啟冪等性
max.in.flight.requests.per.connection 設(shè)置小于5
原理:在kafka1.x 版本以后,啟用冪等性后,kafka broker 會(huì)緩存producer 發(fā)來的最近5個(gè)request 的元數(shù)據(jù),如果數(shù)據(jù)亂序會(huì)將亂序的數(shù)據(jù)保存在內(nèi)存中,重新排序之后在落盤。
四、Broker
ZK存儲
啟動(dòng)zkCli.sh:
docker exec -it zookeeper-server bash
#進(jìn)入之后啟動(dòng)zkCli.sh
bin/zkCli.sh
ls /brokers/ids
get /brokers/topics/second/partitions/0/state
get /controller
/brokes/ids : 記錄有哪些節(jié)點(diǎn)
/brokers/topics/主題/patitions/0/state : 記錄著leader、isr隊(duì)列
/controller : 輔助選舉leader
Broker工作原理
AR: kafka 分區(qū)中所有的副本統(tǒng)稱
工作流程:
1) broker 啟動(dòng)會(huì)在zk中注冊
2) controller 誰注冊,誰說了算
3) 由選舉出來的controller 監(jiān)聽 brokers 節(jié)點(diǎn)變化
4) Controller 決定 Leader 的選舉
選舉規(guī)則:
在isr隊(duì)列中存活為前提,安裝ARa中排在最前面的優(yōu)先。例如 ar[1,0,2]、isr[1,0,2],那么leader 就會(huì)按照1,0,2的順序輪詢。
5) 主broker的Controller,會(huì)將所有節(jié)點(diǎn)的信息上傳到zk
6) 其他節(jié)點(diǎn)的controller 會(huì)去從zk同步相關(guān)信息下來。
7) 假設(shè)broker掛了
8) 監(jiān)聽到broker節(jié)點(diǎn)變化
9) 獲取isr
10) 選舉新的leader
11) 更新leader 以及 isr
新節(jié)點(diǎn)的服役以及退役(沒聽懂)
新節(jié)點(diǎn)服役
docker run -d --name kafka3 \
--network kafka-net \
-p 9095:9095 \
-e KAFKA_BROKER_ID=3 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.204.10:9095 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9095 \
-e TZ="Asia/Shanghai" \
wurstmeister/kafka:latest
查看在新節(jié)點(diǎn)是否有主題信息(指定這臺broker的地址,查看是否有主題信息)
kafka-topics.sh --bootstrap-server 192.168.204.10:9094 --topic first --describe
服役新節(jié)點(diǎn)、正確退役舊節(jié)點(diǎn)
五、Kafka 副本
基本信息
1)Kafka 副本作用: 提高數(shù)據(jù)的可靠性。
2)Kafka默認(rèn)的副本數(shù)為1,生產(chǎn)環(huán)境正常配置倆個(gè),保證數(shù)據(jù)的可靠性;太多副本會(huì)增加磁盤的存儲空間,增加網(wǎng)絡(luò)上數(shù)據(jù)傳輸,降低效率。
3)Kafka 中副本分為: Leader 和 Follower。Kafka生產(chǎn)者只會(huì)把數(shù)據(jù)發(fā)送到Leader,然后Follower 自己去找Leader 同步。
4)Kafka 分區(qū)中的所有副本統(tǒng)稱為AR(Assigned Replicas)。
AR = ISR + OSR
ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 長時(shí)間未向 Leader 發(fā)送通信請求或同步數(shù)據(jù),則該 Follower 將被踢出ISR。該時(shí)間閩值由 replica.lagtime.max.ms參數(shù)設(shè)定,默認(rèn) 30s。Leader 發(fā)生故障之后,就會(huì)從ISR 中選舉新的 Leader。
OSR,表示 Follower 與 Leader 副本同步時(shí),延遲過多的副本。
Leader的選舉流程
Follower的故障
Leader的故障
柚子快報(bào)激活碼778899分享:學(xué)習(xí)筆記 | Kafka
參考閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。