柚子快報激活碼778899分享:java kafka全解
目錄
Kafka概述定義消息隊列目錄結(jié)構(gòu)分析傳統(tǒng)消息隊列的應(yīng)用場景消息隊列的兩種模式點對點模式發(fā)布/訂閱模式
Kafka基礎(chǔ)架構(gòu)
Kafka快速入門安裝部署集群規(guī)劃集群部署集群啟停腳本
Kafka命令行操作Kafka基礎(chǔ)架構(gòu)主題命令行操作生產(chǎn)者命令行操作消費者命令行操作
kafka可視化工具Kafka重要概念brokerzookeeperproducer(生產(chǎn)者)consumer(消費者)consumer group(消費者組)分區(qū)(Partitions)副本(Replicas)主題(Topic)偏移量(offset)消費者組
Kafka生產(chǎn)者生產(chǎn)者消息發(fā)送流程發(fā)送原理生產(chǎn)者重要參數(shù)列表異步發(fā)送API普通異步發(fā)送帶回調(diào)函數(shù)的異步發(fā)送
同步發(fā)送API
生產(chǎn)者分區(qū)分區(qū)和副本機制分區(qū)好處輪詢策略隨機策略(不用)按key分配策略亂序問題
副本機制producer的ACKs參數(shù)acks配置為0acks配置為1acks配置為-1或者all
Kafka生產(chǎn)者冪等性與事務(wù)冪等性Kafka生產(chǎn)者冪等性冪等性原理
Kafka事務(wù)事務(wù)操作API
數(shù)據(jù)有序和數(shù)據(jù)亂序
Kafka BrokerZookeeper存儲的Kafka信息Kafka Broker總體工作流程Broker重要參數(shù)Kafka副本副本基本信息Leader 選舉流程Leader 和 Follower 故障處理細節(jié)活動調(diào)整分區(qū)副本存儲Leader Partition 負載平衡
文件存儲Topic 數(shù)據(jù)的存儲機制文件清理策略
Kafka 消費者Kafka 消費方式Kafka 消費者工作流程消費者組原理消費者重要參數(shù)offset 位移offset 的默認維護位置自動提交offset手動提交offset指定Offset消費
Kafka-Kraft模式Kafka-Kraft架構(gòu)
Go kafkaKafka簡介Kafka的結(jié)構(gòu)Producerkafka clusterBrokerTopicPartitionReplication
ConsumerConsumer Group
Kafka?作流程選擇partition的原則(面試重點)ACK應(yīng)答機制(面試重點)Topic和數(shù)據(jù)?志Partition結(jié)構(gòu)消費數(shù)據(jù)
kafka環(huán)境搭建java環(huán)境變量安裝kafka
GO操作Kafkasarama操作kafka依賴安裝連接kafka發(fā)送消息連接kafka消費消息
kafka-go操作kafka準備Kafka環(huán)境安裝kafka-goConnection發(fā)送消息消費消息創(chuàng)建topic通過非leader節(jié)點連接leader節(jié)點獲取topic列表
Reader消費消息消費者組顯式提交管理提交間隔
Writer發(fā)送消息創(chuàng)建不存在的topic寫入多個topic
其他配置TLSSASLBalancerCompressionLogging
FAQKafka中的消費者組(Consumer Group)的理解消費者組的概念消費者組內(nèi)的消息分配消息分發(fā)規(guī)則實際例子場景1:消費者組內(nèi)的消費者數(shù)量小于分區(qū)數(shù)量場景2:增加消費者以提高處理能力場景3:消費者數(shù)量多于分區(qū)數(shù)量
示例代碼1. 單個消費者2. 增加消費者
注意事項
Kafka概述
定義
Kafka傳統(tǒng)定義: Kafka是一個分布式的基于發(fā)布/訂閱模式的消息隊列(Message Queue),主要應(yīng)用于大數(shù)據(jù)實時處理領(lǐng)域。
發(fā)布/訂閱:消息的發(fā)布者不會將消息直接發(fā)布給特定的訂閱者,而是將發(fā)布的消息分為不同的類別,訂閱者只接收感興趣的消息。
Kafka最新定義:Kafka是一個開源的分布式事件流平臺(Event Streaming Platform),被數(shù)千家公司用于高性能的數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用。
消息隊列
目前企業(yè)中比較常見的消息隊列產(chǎn)品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。
在大數(shù)據(jù)場景主要采用Kafka作為消息隊列。在JavaEE開發(fā)中主要采用ActiveMQ、RabbitMQ、RocketMQ。
目錄結(jié)構(gòu)分析
bin:Kafka的所有執(zhí)行腳本都在這里。例如:啟動Kafka服務(wù)器、創(chuàng)建Topic、生產(chǎn)者、消費者程序等等config:Kafka的所有配置文件libs: 運行Kafka所需要的所有JAR包logs: Kafka的所有日志文件,如果Kafka出現(xiàn)一些問題,需要到該目錄中去查看異常信息site-docs: Kafka的網(wǎng)站幫助文件
傳統(tǒng)消息隊列的應(yīng)用場景
傳統(tǒng)的消息隊列的主要應(yīng)用場景包括**:緩存/消峰、解耦和異步通信**。
緩沖/消峰: 有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費消息的處理速度不一致的情況。
解耦:允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
異步通信:允許用戶把一個消息放入隊列,但并不立即處理它,然后再需要的時候再去處理它們。
消息隊列的兩種模式
點對點模式
消費者主動拉去數(shù)據(jù),消息收到后清除消息
發(fā)布/訂閱模式
可以有多個topic主題(瀏覽,點贊,收藏,評論等)消費者消費數(shù)據(jù)之后,不刪除數(shù)據(jù)每個消費者互相獨立,都可以消費到數(shù)據(jù)
Kafka基礎(chǔ)架構(gòu)
1、為方便擴展,并提高吞吐量,一個topic分為多個partition
2、配合分區(qū)的設(shè)計,提出消費者組的概念,組內(nèi)每個消費者并行消費
3、為提高可用性,為每個partition增加若干副本,類似NameNode HA
4、ZK中記錄誰是leader,Kafka2.8.0 以后也可以配置不采用ZK.
Producer:消息生產(chǎn)者,就是向Kafka broker 發(fā)消息的客戶端。 Consumer:消息消費者,向Kafka broker 取消息的客戶端。 Consumer Group(CG):消費者組,由多個consumer組成。消費者組內(nèi)每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)消費者消費;消費者組之間互不影響。所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。 Broker:一臺Kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。 Topic: 可以理解為一個隊列,生產(chǎn)者和消費者面向的都是一個topic。 Partition: 為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。 Replica:副本。一個topic的每個分區(qū)都有若干個副本,一個Leader和若干個Follower。 Leader:每個分區(qū)多個副本的 “主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費者消費數(shù)據(jù)的對象都是Leader。 Follower:每個分區(qū)多個副本中的 “從”,實時從 Leader 中同步數(shù)據(jù),保持和 Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時,某個Follower會成為新的 Leader。
Kafka快速入門
安裝部署
集群規(guī)劃
Hadoop102Hadoop103Hadoop104zkzkzkkafkakafkakafka
集群部署
docker部署zk集群:參考《zk全解》 進入到/usr/local/kafka目錄,修改配置文件 vim server.properties
#broker 的全局唯一編號,不能重復(fù),只能是數(shù)字。
broker.id=0
#處理網(wǎng)絡(luò)請求的線程數(shù)量
num.network.threads=3
#用來處理磁盤 IO 的線程數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka 運行日志(數(shù)據(jù))存放的路徑,路徑不需要提前創(chuàng)建,kafka 自動幫你創(chuàng)建,可以
# 配置多個磁盤路徑,路徑與路徑之間可以用","分隔
log.dirs=/opt/module/kafka/datas
# 監(jiān)聽所有網(wǎng)卡地址,允許外部端口連接
listeners=PLAINTEXT://0.0.0.0:9092
#topic 在當前 broker 上的分區(qū)個數(shù)
num.partitions=1
#用來恢復(fù)和清理 data 下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
# 每個 topic 創(chuàng)建時的副本數(shù),默認時 1 個副本
offsets.topic.replication.factor=1
#segment 文件保留的最長時間,超時將被刪除
log.retention.hours=168
#每個 segment 文件的大小,默認最大 1G
log.segment.bytes=1073741824
# 檢查過期數(shù)據(jù)的時間,默認 5 分鐘檢查一次是否數(shù)據(jù)過期
log.retention.check.interval.ms=300000
#配置連接 Zookeeper 集群地址(在 zk 根目錄下創(chuàng)建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
可以提前在hosts文件中配置master,slave1,slave2的ip,之前在學(xué)習(xí)k8s的時候我已經(jīng)配置過了,可以直接拿來用。 listeners=PLAINTEXT://0.0.0.0:9092 ,默認情況下,advertised.listeners不設(shè)置的話,則默認使用listeners的屬性,然而advertised.listeners是不支持0.0.0.0的,所以需要指定暴露的監(jiān)聽器,如下 listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://虛擬機ip:9092
將安裝包拷貝到其他服務(wù)器 分別在hadoop103和hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2 配置環(huán)境變量
在/etc/profile.d/my_env.sh 文件中增加 kafka 環(huán)境變量配置 sudo vim /etc/profile.d/my_env.sh
增加如下內(nèi)容:
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
這里我將kafka直接放在了根目錄下的一個文件夾,更加方便:
刷新一下環(huán)境變量。 source /etc/profile
分發(fā)環(huán)境變量文件到其他節(jié)點,并 source。 sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
source /etc/profile
source /etc/profile
分別啟動kafka:
bin/kafka-server-start.sh -daemon config/server.properties
如果遇到cluser_id不符合的問題,直接將日志文件刪除重新啟動即可。
集群啟停腳本
腳本如下,
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------啟動 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -
daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac
添加執(zhí)行權(quán)限
chmod +x kf.sh
啟動集群命令
kf.sh start
停止集群命令
kf.sh stop
Kafka命令行操作
Kafka基礎(chǔ)架構(gòu)
主題命令行操作
查看操作主題命令參數(shù) ./bin/kafka-topics.sh
查看當前服務(wù)器中的所有topic ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
創(chuàng)建 first topic ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic first
選項說明:
–topic 定義 topic 名–replication-factor 定義副本數(shù)–partitions 定義分區(qū)數(shù) 查看 first 主題的詳情 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
修改分區(qū)數(shù)(注意:分區(qū)數(shù)只能增加,不能減少) ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 3
查看結(jié)果: ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
Topic: first TopicId: _Pjhmn1NTr6ufGufcnsw5A PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: first Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: first Partition: 2 Leader: 0 Replicas: 0 Isr: 0
刪除 topic ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first
生產(chǎn)者命令行操作
查看操作者命令參數(shù) ./bin/kafka-console-producer.sh
發(fā)送消息 ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
>hello world
>yooome yooome
消費者命令行操作
查看操作消費者命令參數(shù) ./bin/kafka-console-consumer.sh
消費消息
消費first 主題中的數(shù)據(jù): ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
把主題中所有的數(shù)據(jù)都讀取出來(包括歷史數(shù)據(jù))。 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
kafka可視化工具
官網(wǎng):https://www.kafkatool.com/download.html
Kafka重要概念
broker
一個Kafka的集群通常由多個broker組成,這樣才能實現(xiàn)負載均衡、以及容錯broker是無狀態(tài)(Sateless)的,它們是通過ZooKeeper來維護集群狀態(tài)一個Kafka的broker每秒可以處理數(shù)十萬次讀寫,每個broker都可以處理TB消息而不影響性能
zookeeper
ZK用來管理和協(xié)調(diào)broker,并且存儲了Kafka的元數(shù)據(jù)(例如:有多少topic、partition、consumer) ZK服務(wù)主要用于通知生產(chǎn)者和消費者Kafka集群中有新的broker加入、或者Kafka集群中出現(xiàn)故障的broker。 Kafka正在逐步想辦法將ZooKeeper剝離,維護兩套集群成本較高,社區(qū)提出KIP-500就是要替換掉ZooKeeper的依賴?!癒afka on Kafka”——Kafka自己來管理自己的元數(shù)據(jù)
producer(生產(chǎn)者)
生產(chǎn)者負責(zé)將數(shù)據(jù)推送給broker的topic
consumer(消費者)
消費者負責(zé)從broker的topic中拉取數(shù)據(jù),并自己進行處理
consumer group(消費者組)
consumer group是kafka提供的可擴展且具有容錯性的消費者機制一個消費者組可以包含多個消費者一個消費者組有一個唯一的ID(group Id)組內(nèi)的消費者一起消費主題的所有分區(qū)數(shù)據(jù)
分區(qū)(Partitions)
在Kafka集群中,主題被分為多個分區(qū)
副本(Replicas)
副本可以確保某個服務(wù)器出現(xiàn)故障時,確保數(shù)據(jù)依然可用,在Kafka中,一般都會設(shè)計副本的個數(shù)>1,
主題(Topic)
主題是一個邏輯概念,用于生產(chǎn)者發(fā)布數(shù)據(jù),消費者拉取數(shù)據(jù)Kafka中的主題必須要有標識符,而且是唯一的,Kafka中可以有任意數(shù)量的主題,沒有數(shù)量上的限制在主題中的消息是有結(jié)構(gòu)的,一般一個主題包含某一類消息一旦生產(chǎn)者發(fā)送消息到主題中,這些消息就不能被更新(更改)
偏移量(offset)
offset記錄著下一條將要發(fā)送給Consumer的消息的序號默認Kafka將offset存儲在ZooKeeper中在一個分區(qū)中,消息是有順序的方式存儲著,每個在分區(qū)的消費都是有一個遞增的id。這個就是偏移量offset偏移量在分區(qū)中才是有意義的。在分區(qū)之間,offset是沒有任何意義的
消費者組
Kafka支持有多個消費者同時消費一個主題中的數(shù)據(jù)。 同時運行兩個消費者,我們發(fā)現(xiàn),只有一個消費者程序能夠拉取到消息。想要讓兩個消費者同時消費消息,必須要給test主題,添加一個分區(qū)。 設(shè)置 test topic為2個分區(qū)bin/kafka-topics.sh --zookeeper 192.168.88.100:2181 -alter --partitions 2 --topic test
Kafka生產(chǎn)者
生產(chǎn)者消息發(fā)送流程
發(fā)送原理
在消息發(fā)送的過程中,涉及到了兩個線程 — main 線程和Sender線程。在main線程中創(chuàng)建了一個雙端隊列 RecordAccumulator。main線程將消息發(fā)送給ResordAccumlator,Sender線程不斷從 RecordAccumulator 中拉去消息發(fā)送到 Kafka Broker。
生產(chǎn)者重要參數(shù)列表
異步發(fā)送API
普通異步發(fā)送
需求:創(chuàng)建Kafka生產(chǎn)者,采用異步的方式發(fā)送到Kafka Broker。
2、代碼編程go get github.com/Shopify/sarama
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 發(fā)送完數(shù)據(jù)需要leader和follow都確認
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition
config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回
// 構(gòu)造一個消息
msg := &sarama.ProducerMessage{}
msg.Topic = "first"
msg.Value = sarama.StringEncoder("this is a test log")
// 連接kafka
client, err := sarama.NewSyncProducer([]string{
"192.168.71.128:9092", "192.168.71.129:9092", "192.168.71.130:9092",
}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
} else {
fmt.Println(client)
}
defer client.Close()
// 發(fā)送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
帶回調(diào)函數(shù)的異步發(fā)送
【注意:】消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試。
同步發(fā)送API
生產(chǎn)者分區(qū)
分區(qū)和副本機制
生產(chǎn)者寫入消息到topic,Kafka將依據(jù)不同的策略將數(shù)據(jù)分配到不同的分區(qū)中
輪詢分區(qū)策略隨機分區(qū)策略按key分區(qū)分配策略自定義分區(qū)策略
分區(qū)好處
便于合理使用存儲資源,每個Partition在一個Broker上存儲,可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)存儲在多臺Broker上。合理控制分區(qū)的任務(wù),可以實現(xiàn)負載均衡的效果。 提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費者可以以分區(qū)為單位進行 消費數(shù)據(jù)。
輪詢策略
默認的策略,也是使用最多的策略,可以最大限度保證所有消息平均分配到一個分區(qū)如果在生產(chǎn)消息時,key為null,則使用輪詢算法均衡地分配分區(qū)
隨機策略(不用)
隨機策略,每次都隨機地將消息分配到每個分區(qū)。在較早的版本,默認的分區(qū)策略就是隨機策略,也是為了將消息均衡地寫入到每個分區(qū)。但后續(xù)輪詢策略表現(xiàn)更佳,所以基本上很少會使用隨機策略。
按key分配策略
按key分配策略,有可能會出現(xiàn)「數(shù)據(jù)傾斜」,例如:某個key包含了大量的數(shù)據(jù),因為key值一樣,所有所有的數(shù)據(jù)將都分配到一個分區(qū)中,造成該分區(qū)的消息數(shù)量遠大于其他的分區(qū)。
亂序問題
輪詢策略、隨機策略都會導(dǎo)致一個問題,生產(chǎn)到Kafka中的數(shù)據(jù)是亂序存儲的。而按key分區(qū)可以一定程度上實現(xiàn)數(shù)據(jù)有序存儲——也就是局部有序,但這又可能會導(dǎo)致數(shù)據(jù)傾斜,所以在實際生產(chǎn)環(huán)境中要結(jié)合實際情況來做取舍。
副本機制
副本的目的就是冗余備份,當某個Broker上的分區(qū)數(shù)據(jù)丟失時,依然可以保障數(shù)據(jù)可用。因為在其他的Broker上的副本是可用的。
producer的ACKs參數(shù)
對副本關(guān)系較大的就是,producer配置的acks參數(shù)了,acks參數(shù)表示當生產(chǎn)者生產(chǎn)消息的時候,寫入到副本的要求嚴格程度。它決定了生產(chǎn)者如何在性能和可靠性之間做取舍。
acks配置為0
acks配置為1
當生產(chǎn)者的ACK配置為1時,生產(chǎn)者會等待leader副本確認接收后,才會發(fā)送下一條數(shù)據(jù),性能中等。
acks配置為-1或者all
Kafka生產(chǎn)者冪等性與事務(wù)
冪等性
拿http舉例來說,一次或多次請求,得到地響應(yīng)是一致的(網(wǎng)絡(luò)超時等問題除外),換句話說,就是執(zhí)行多次操作與執(zhí)行一次操作的影響是一樣的。
如果,某個系統(tǒng)是不具備冪等性的,如果用戶重復(fù)提交了某個表格,就可能會造成不良影響。例如:用戶在瀏覽器上點擊了多次提交訂單按鈕,會在后臺生成多個一模一樣的訂單。
Kafka生產(chǎn)者冪等性
在生產(chǎn)者生產(chǎn)消息時,如果出現(xiàn)retry時,有可能會一條消息被發(fā)送了多次,如果Kafka不具備冪等性的,就有可能會在partition中保存多條一模一樣的消息。
冪等性原理
為了實現(xiàn)生產(chǎn)者的冪等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。
PID:每個Producer在初始化時,都會分配一個唯一的PID,這個PID對用戶來說,是透明的。Sequence Number:針對每個生產(chǎn)者(對應(yīng)PID)發(fā)送到指定主題分區(qū)的消息都對應(yīng)一個從0開始遞增的Sequence Number。冪等性只能保證的是在單分區(qū)單會話內(nèi)不重復(fù)
Kafka事務(wù)
Kafka事務(wù)是2017年Kafka 0.11.0.0引入的新特性。類似于數(shù)據(jù)庫的事務(wù)。Kafka事務(wù)指的是生產(chǎn)者生產(chǎn)消息以及消費者提交offset的操作可以在一個原子操作中,要么都成功,要么都失敗。尤其是在生產(chǎn)者、消費者并存時,事務(wù)的保障尤其重要。(consumer-transform-producer模式) 開啟事務(wù),必須開啟冪等性
事務(wù)操作API
Producer接口中定義了以下5個事務(wù)相關(guān)方法:
initTransactions(初始化事務(wù)):要使用Kafka事務(wù),必須先進行初始化操作beginTransaction(開始事務(wù)):啟動一個Kafka事務(wù)sendOffsetsToTransaction(提交偏移量):批量地將分區(qū)對應(yīng)的offset發(fā)送到事務(wù)中,方便后續(xù)一塊提交commitTransaction(提交事務(wù)):提交事務(wù)abortTransaction(放棄事務(wù)):取消事務(wù)
數(shù)據(jù)有序和數(shù)據(jù)亂序
Kafka Broker
Zookeeper存儲的Kafka信息
[zk: localhost:2181(CONNECTING) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
Kafka Broker總體工作流程
Broker重要參數(shù)
Kafka副本
副本基本信息
Kafka副本作用:提高數(shù)據(jù)可靠性。Kafka默認副本1個,生產(chǎn)環(huán)境一般配置為2個,保證數(shù)據(jù)可靠性;太多副本會增加磁盤存儲空間,增加網(wǎng)絡(luò)上數(shù)據(jù)傳輸,降低效率。Kafka中副本為:Leader和Follower。Kafka生產(chǎn)者只會把數(shù)據(jù)發(fā)往 Leader,然后Follower 找 Leader 進行同步數(shù)據(jù)。Kafka 分區(qū)中的所有副本統(tǒng)稱為 AR(Assigned Repllicas)。
AR = ISR + OSR
ISR:表示 Leader 保持同步的 Follower 集合。如果 Follower 長時間未 向 Leader 發(fā)送通信請求或同步數(shù)據(jù),則該 Follower 將被踢出 ISR。該時間閾值由 replica.lag.time.max.ms 參數(shù)設(shè)定,默認 30s 。Leader 發(fā)生故障之后,就會從 ISR 中選舉新的 Leader。
OSR:表示 Follower 與 Leader 副本同步時,延遲過多的副本。
Leader 選舉流程
Kafka 集群中有一個 broker 的 Controller 會被選舉為 Controller Leader ,負責(zé)管理集群 broker 的上下線,所有 topic 的分區(qū)副本分配 和 Leader 選舉等工作。
創(chuàng)建一個新的 topic,4 個分區(qū),4 個副本
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4
Created topic atguigu1.
查看 Leader 分布情況
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3
停止掉 hadoop105 的 kafka 進程,并查看 Leader 分區(qū)情況
[atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0
停止掉 hadoop104 的 kafka 進程,并查看 Leader 分區(qū)情況
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0
啟動 hadoop105 的 kafka 進程,并查看 Leader 分區(qū)情況
[atguigu@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3
啟動 hadoop104 的 kafka 進程,并查看 Leader 分區(qū)情況
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2
停止掉 hadoop103 的 kafka 進程,并查看 Leader 分區(qū)情況
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2
Topic: atguigu1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,2
Leader 和 Follower 故障處理細節(jié)
LEO(Log End Offset): 每個副本的最后一個offset,LEO其實就是最新的 offset + 1。
HW(High Watermark):所有副本中最小的LEO。
LEO(Log End Offset):每個副本的最后一個offset,LEO其實就是最新的offset + 1
HW(High Watermark):所有副本中最小的LEO
活動調(diào)整分區(qū)副本存儲
在生產(chǎn)環(huán)境中,每臺服務(wù)器的配置和性能不一致,但是kafka只會根據(jù)自己的代碼規(guī)則創(chuàng)建對應(yīng)的分區(qū)副本,就會導(dǎo)致個別服務(wù)器存儲壓力較大。所有需要手動調(diào)整分區(qū)副本的存儲。
需求:創(chuàng)建一個新的 topic ,4個分區(qū),兩個副本,名稱為three 。將該 topic 的所有副本都存儲到 broker0 和 broker1 兩臺服務(wù)器上。
手動調(diào)整分區(qū)副本存儲的步驟如下:
創(chuàng)建一個新的 topic,名稱為 three。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --create --partitions 4 --replication-factor 2 --
topic three
查看分區(qū)副本存儲情況
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --describe --topic three
創(chuàng)建副本存儲計劃(所有副本都指定存儲在 broker0、broker1 中)。
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
輸入如下內(nèi)容:
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}
執(zhí)行副本存儲計劃。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --reassignment-json-file
increase-replication-factor.json --execute
驗證副本存儲計劃。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --reassignment-json-file
increase-replication-factor.json --verify
查看分區(qū)副本存儲情況。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --describe --topic three
Leader Partition 負載平衡
正常情況下,Kafka本身會自動把Leader Partition均勻分散在各個機器上,來保證每臺機器的讀寫吞吐量都是均勻的。但是如果某 些broker宕機,會導(dǎo)致Leader Partition過于集中在其他少部分幾臺broker上,這會導(dǎo)致少數(shù)幾臺broker的讀寫請求壓力過高,其他宕機的broker重啟之后都是follower partition,讀寫請求很低,造成集群負載不均衡。
參數(shù)名稱描述auto.leader.rebalance.enable默認是 true。 自動 Leader Partition 平衡。生產(chǎn)環(huán)境中,leader 重選舉的代價比較大,可能會帶來性能影響,建議設(shè)置為 false 關(guān)閉。leader.imbalance.per.broker.percentage默認是 10%。每個 broker 允許的不平衡的 leader的比率。如果每個 broker 超過了這個值,控制器會觸發(fā) leader 的平衡。leader.imbalance.check.interval.seconds默認值 300 秒。檢查 leader 負載是否平衡的間隔時間。
文件存儲
Topic 數(shù)據(jù)的存儲機制
查看 hadoop102(或者 hadoop103、hadoop104)的/opt/module/kafka/datas/first-1 (first-0、first-2)路徑上的文件
[atguigu@hadoop104 first-1]$ ls
00000000000000000092.index
00000000000000000092.log
00000000000000000092.snapshot
00000000000000000092.timeindex
leader-epoch-checkpoint
partition.metadata
直接查看 log 日志,發(fā)現(xiàn)是亂碼。
通過工具查看 index 和 log 信息。
[atguigu@hadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments
--files ./00000000000000000000.index
Dumping ./00000000000000000000.index
offset: 3 position: 152
日志存儲參數(shù)配置
參數(shù)描述log.segment.bytesKafka 中 log 日志是分成一塊塊存儲的,此配置是指 log 日志劃分成塊的大小,默認值 1G。log.index.interval.bytes默認 4kb,kafka 里面每當寫入了 4kb 大小的日志(.log),然后就往 index 文件里面記錄一個索引。 稀疏索引。
文件清理策略
Kafka 中默認的日志保存時間為 7 天,可以通過調(diào)整如下參數(shù)修改保存時間。
Log.retention.hours,最低優(yōu)先級小時,默認7天。log.retention.minutes,分鐘。log.retention.ms,最高優(yōu)先級毫秒。log.retention.check.interval.ms,負責(zé)設(shè)置檢查周期,默認 5 分鐘。
那么日志一旦超過了設(shè)置的時間,怎么處理呢?
Kafka 中提供的日志清理策略有 delete 和 compact 兩種。
delete 日志闡述:將過期數(shù)據(jù)刪除
log.cleanup.policy = delete 所有數(shù)據(jù)啟用闡述策略
(1) 基于時間:默認打開。以 segment 中所有記錄中的最大時間戳作為該文件時間戳。
(2) 基于大?。耗J關(guān)閉。超過設(shè)置的所有日志總大小,闡述最早的 segment 。
log.retention.bytes,默認等于-1,表示無窮大。
compact 日志壓縮
compact日志壓縮:對于相同 key 的不同 value 值,值保留最后一個版本。
log.cleanup.policy = compact所有數(shù)據(jù)啟動壓縮策略
壓縮后的offset可能是不連續(xù)的,比如上圖中沒有6,當從這些offset消費消息時,將會拿到比這個 offset 大的 offset 對應(yīng)的消息,實際上會拿到 offset 為 7 的消息,并從這個位置開始消費。
? 這種策略只適合特殊場景,比如消息的 key 是用戶 ID,value 是用戶的資料,通過這種壓縮策略,整個消息集里就保存了所有用戶最新的資料。
Kafka 消費者
Kafka 消費方式
pull(拉)模式:consumer 采用從 broker 中主動拉去數(shù)據(jù)。Kafka 采用這種方式。push(推)模式:Kafka沒有采用這種方式,因為由 broker 決定消息發(fā)送速率,很難適應(yīng)所有消費者的消費速率。例如推送的速度是 50m/s,Consumer1,Consumer2就來不及處理消息。
pull 模式不足之處是,如果Kafka 沒有數(shù)據(jù),消費者可能會陷入循環(huán)中,一直返回空數(shù)據(jù)。
Kafka 消費者工作流程
消費者組原理
Consumer Group (CG):消費者組,由多個consumer組成。形成一個消費者組的條件是所有消費者的 groupid 相同。
消費者組內(nèi)每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)消費者消費。消費者組之間互不影響。所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。
消費者重要參數(shù)
參數(shù)名稱描述bootstrap.servers向 Kafka 集群建立初始連接用到的 host/port 列表。key.deserializer 和value.deserializer指定接收消息的 key 和 value 的反序列化類型。一定要寫全類名。group.id標記消費者所屬的消費者組。enable.auto.commit默認值為 true,消費者會自動周期性地向服務(wù)器提交偏移量。auto.commit.interval.ms如果設(shè)置了 enable.auto.commit 的值為 true, 則該值定義了消費者偏移量向 Kafka 提交的頻率,默認 5s。auto.offset.reset當 Kafka 中沒有初始偏移量或當前偏移量在服務(wù)器中不存在(如,數(shù)據(jù)被刪除了),該如何處理? earliest:自動重置偏移量到最早的偏移量。 latest:默認,自動重置偏移量為最新的偏移量。 none:如果消費組原來的(previous)偏移量不存在,則向消費者拋異常。 anything:向消費者拋異常。offsets.topic.num.partitions__consumer_offsets 的分區(qū)數(shù),默認是 50 個分區(qū)。heartbeat.interval.msKafka 消費者和 coordinator 之間的心跳時間,默認 3s。該條目的值必須小于 session.timeout.ms ,也不應(yīng)該高于session.timeout.ms 的 1/3。session.timeout.msKafka 消費者和 coordinator 之間連接超時時間,默認 45s。超過該值,該消費者被移除,消費者組執(zhí)行再平衡。max.poll.interval.ms消費者處理消息的最大時長,默認是 5 分鐘。超過該值,該消費者被移除,消費者組執(zhí)行再平衡。fetch.min.bytes默認 1 個字節(jié)。消費者獲取服務(wù)器端一批消息最小的字節(jié)數(shù)。fetch.max.wait.ms默認 500ms。如果沒有從服務(wù)器端獲取到一批數(shù)據(jù)的最小字節(jié)數(shù)。該時間到,仍然會返回數(shù)據(jù)。fetch.max.bytes默認 Default: 52428800(50 m)。消費者獲取服務(wù)器端一批消息最大的字節(jié)數(shù)。如果服務(wù)器端一批次的數(shù)據(jù)大于該值(50m)仍然可以拉取回來這批數(shù)據(jù),因此,這不是一個絕對最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影響。max.poll.records一次 poll 拉取數(shù)據(jù)返回消息的最大條數(shù),默認是 500 條。
offset 位移
offset 的默認維護位置
自動提交offset
為了使我們能夠?qū)W⒂谧约旱臉I(yè)務(wù)邏輯,Kafka提供了自動提交offset的功能。
自動提交offset的相關(guān)參數(shù):
enable.auto.commit:是否開啟自動提交offset功能,默認是true auto.commit.interval.ms:自動提交offset的時間間隔,默認是5s
參數(shù)名稱描述enable.auto.commit默認值為 true,消費者會自動周期性地向服務(wù)器提交偏移量。auto.commit.interval.ms如果設(shè)置了 enable.auto.commit 的值為 true, 則該值定義了消費者偏移量向 Kafka 提交的頻率,默認 5s。
手動提交offset
雖然自動提交offset十分簡單比那里,但由于其是基于時間提交的,開發(fā)人員難以把握 offset 提交的時機。一次 Kafka 還提供了手動提交 offset 的API。
手動提交 offset 的方法有兩種:分別是 commitSync(同步提交)和commitAsync(異步提交)。兩者的相同點是,都會將本次提交的一批數(shù)據(jù)最高的偏移量提交;不同點是,同步提交阻塞當前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導(dǎo)致,也會出現(xiàn)提交失?。?;而異步提交則沒有失敗重試機制,故有可能提交失敗。
commitSync(同步提交):必須等待offset提交完畢,再去消費下一批數(shù)據(jù)。commitAsync(異步提交) :發(fā)送完提交offset請求后,就開始消費下一批數(shù)據(jù)了。
指定Offset消費
auto.offset.reset = earliest | latest | none 默認是 latest。
當 Kafka 中沒有初始偏移量(消費者組第一次消費)或服務(wù)器上不再存在當前偏移量
時(例如該數(shù)據(jù)已被刪除),該怎么辦?
(1)earliest:自動將偏移量重置為最早的偏移量,–from-beginning。
(2)latest(默認值):自動將偏移量重置為最新偏移量。
(3)none:如果未找到消費者組的先前偏移量,則向消費者拋出異常。
Kafka-Kraft模式
Kafka-Kraft架構(gòu)
左圖為 Kafka 現(xiàn)有架構(gòu),元數(shù)據(jù)在 zookeeper 中,運行時動態(tài)選舉 controller,由controller 進行 Kafka 集群管理。右圖為 kraft 模式架構(gòu)(實驗性),不再依賴 zookeeper 集群,而是用三臺 controller 節(jié)點代替 zookeeper,元數(shù)據(jù)保存在 controller 中,由 controller 直接進行 Kafka 集群管理。
這樣做的好處有以下幾個:
Kafka 不再依賴外部框架,而是能夠獨立運行; controller 管理集群時,不再需要從 zookeeper 中先讀取數(shù)據(jù),集群性能上升; 由于不依賴 zookeeper,集群擴展時不再受到 zookeeper 讀寫能力限制; controller 不再動態(tài)選舉,而是由配置文件規(guī)定。這樣我們可以有針對性的加強controller 節(jié)點的配置,而不是像以前一樣對隨機 controller 節(jié)點的高負載束手無策。
Go kafka
Kafka簡介
Kafka是分布式的:其所有的構(gòu)件borker(服務(wù)端集群)、producer(消息生產(chǎn))、consumer(消息消費者)都可以是分布式的。 可以進行分區(qū):每一個分區(qū)都是一個順序的、不可變的消息隊列, 并且可以持續(xù)的添加。 高吞吐量。
Kafka的結(jié)構(gòu)
Producer
Producer即生產(chǎn)者,消息的產(chǎn)生者,是消息的?口。
kafka cluster
kafka集群,一臺或多臺服務(wù)?組成
Broker
Broker是指部署了Kafka實例的服務(wù)?節(jié)點。
Topic
消息的主題,可以理解為消息的分類,kafka的數(shù)據(jù)就保存在topic。在每個broker上 都可以創(chuàng)建多個topic。實際應(yīng)用中通常是一個業(yè)務(wù)線建一個topic。
Partition
Topic的分區(qū),每個topic可以有多個分區(qū),分區(qū)的作用是做負載,提高kafka的吞吐量。同一個topic在不同的分區(qū)的數(shù)據(jù)是不重復(fù)的,partition的表現(xiàn)形式就是一個一個的?件夾!
Replication
每一個分區(qū)都有多個副本,副本的作用是做備胎。當主分區(qū)(Leader)故障的時候會選擇一個備胎(Follower)上位,成為Leader。
在kafka中默認副本的最大數(shù)量是10 個,且副本的數(shù)量不能大于Broker的數(shù)量,follower和leader絕對是在不同的機器,同一機?對同一個分區(qū)也只可能存放一個副本(包括自己)。
Consumer
消費者,即消息的消費方,是消息的出口。
Consumer Group
我們可以將多個消費組組成一個消費者組,在kafka的設(shè)計中同一個分區(qū)的數(shù)據(jù)只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個 topic的不同分區(qū)的數(shù)據(jù),這也是為了提高kafka的吞吐量!
Kafka?作流程
?產(chǎn)者從Kafka集群獲取分區(qū)leader信息?產(chǎn)者將消息發(fā)送給leaderleader將消息寫入本地磁盤follower從leader拉取消息數(shù)據(jù)follower將消息寫入本地磁盤后向leader發(fā)送ACKleader收到所有的follower的ACK之后向生產(chǎn)者發(fā)送ACK
選擇partition的原則(面試重點)
某個topic有多個partition,producer?怎么知道該將數(shù)據(jù)發(fā)往哪個partition?
直接指定:寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應(yīng)的partition。hash:如果沒有指定partition,但是設(shè)置了數(shù)據(jù)的key,則會根據(jù)key的值hash出一個partition。輪詢:如果既沒指定partition,又沒有設(shè)置key,則會采用輪詢?式,即每次取一小段時間的數(shù)據(jù)寫入某個partition,下一小段的時間寫入下一個partition。
ACK應(yīng)答機制(面試重點)
producer在向kafka寫入消息的時候,可以設(shè)置參數(shù)來確定是否確認kafka接收到數(shù)據(jù),這個參數(shù)可設(shè)置 的值為 0,1,all
0:代表producer往集群發(fā)送數(shù)據(jù)不需要等到集群的返回,不確保消息發(fā)送成功。安全性最低但是效 率最高。1:代表producer往集群發(fā)送數(shù)據(jù)只要leader應(yīng)答就可以發(fā)送下一條,只確保leader發(fā)送成功。all:代表producer往集群發(fā)送數(shù)據(jù)需要所有的follower都完成從leader的同步才會發(fā)送下一條,確保 leader發(fā)送成功和所有的副本都完成備份。安全性最?高,但是效率最低。
如果往不存在的topic寫數(shù)據(jù),kafka會?動創(chuàng)建topic,partition和replication的數(shù)量 默認配置都是1。
Topic和數(shù)據(jù)?志
topic 是同?類別的消息記錄(record)的集合。在Kafka中,?個主題通常有多個訂閱者。對于每個 主題,Kafka集群維護了?個分區(qū)數(shù)據(jù)?志?件結(jié)構(gòu)如下:
每個partition都是?個有序并且不可變的消息記錄集合。當新的數(shù)據(jù)寫?時,就被追加到partition的末尾。在每個partition中,每條消息都會被分配?個順序的唯?標識,這個標識被稱為offset,即偏移 量。Kafka只保證在同?個partition內(nèi)部消息是有序的,在不同partition之間,并不能保證消息有序。
Kafka可以配置?個保留期限,?來標識?志會在Kafka集群內(nèi)保留多?時間。Kafka集群會保留在保留期限內(nèi)所有被發(fā)布的消息,不管這些消息是否被消費過。
?如保留期限設(shè)置為兩天,那么數(shù)據(jù)被發(fā)布到 Kafka集群的兩天以內(nèi),所有的這些數(shù)據(jù)都可以被消費。當超過兩天,這些數(shù)據(jù)將會被清空,以便為后 續(xù)的數(shù)據(jù)騰出空間。
由于Kafka會將數(shù)據(jù)進?持久化存儲(即寫?到硬盤上),所以保留的數(shù)據(jù)??可 以設(shè)置為?個?較?的值。
Partition結(jié)構(gòu)
Partition在服務(wù)器上的表現(xiàn)形式就是?個?個的?件夾,每個partition的?件夾下?會有多組segment ?件,每組segment?件?包含 .index ?件、 .log ?件、 .timeindex ?件三個?件,其中 .log ?件就是實際存儲message的地?,? .index 和 .timeindex ?件為索引?件,?于檢索消息。
消費數(shù)據(jù)
多個消費者實例可以組成?個消費者組,并??個標簽來標識這個消費者組。?個消費者組中的不同消費者實例可以運?在不同的進程甚?不同的服務(wù)器上。 如果所有的消費者實例都在不同的消費者組,那么每?條消息記錄會被?播到每?個消費者實例。 在同?個消費者組中,每個消費者實例可以消費多個分區(qū),但是每個分區(qū)最多只能被消費者組中的?個實例消費。
kafka環(huán)境搭建
kafka環(huán)境基于zookeeper,zookeeper環(huán)境基于JAVA-JDK。
?。?!新版本的kafka自帶zookeeper,可以不手動安裝。
java環(huán)境變量
https://www.oracle.com/technetwork/java/javase/downloads/jdk12-downloads-5295953.html
安裝kafka
http://kafka.apache.org/downloads
1.打開config目錄下的server.properties文件
2.修改log.dirs=F:/tmp/kafka-logs //日志存放
3.打開config目錄下的zookeeper.properties文件
4.修改dataDir=F:/tmp/zookeeper //數(shù)據(jù)存放
啟動:
先執(zhí)行:bin\windows\zookeeper-server-start.bat config\zookeeper.properties
再執(zhí)行:bin\windows\kafka-server-start.bat config\server.properties
zookeeper: kafka:
GO操作Kafka
sarama操作kafka
依賴安裝
go get github.com/Shopify/sarama
windows: mod文件中手動加 require github.com/shopify/sarama v1.19.0
Go語言中連接kafka使用第三方庫:github.com/IBM/sarama。
go get github.com/IBM/sarama這個庫已經(jīng)由Shopify轉(zhuǎn)給了IBM。
sarama v1.20之后的版本加入了zstd壓縮算法,需要用到cgo,在Windows平臺編譯時會提示類似如下錯誤:
# github.com/DataDog/zstd
exec: "gcc":executable file not found in %PATH%
所以在Windows平臺請使用v1.19版本的sarama。
連接kafka發(fā)送消息
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// 基于sarama第三方庫開發(fā)的kafka client
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 發(fā)送完數(shù)據(jù)需要leader和follow都確認
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition
config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回
// 構(gòu)造一個消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log")
// 連接kafka
client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 發(fā)送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
連接kafka消費消息
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// kafka consumer
func main() {
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("web_log") // 根據(jù)topic取到所有的分區(qū)
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍歷所有的分區(qū)
// 針對每個分區(qū)創(chuàng)建一個對應(yīng)的分區(qū)消費者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 異步從每個分區(qū)消費信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
}
kafka-go操作kafka
相較于sarama, kafka-go 更簡單、更易用。 segmentio/kafka-go 是純Go實現(xiàn),提供了與kafka交互的低級別和高級別兩套API,同時也支持Context。 此外社區(qū)中另一個比較常用的confluentinc/confluent-kafka-go,它是一個基于cgo的librdkafka包裝,在項目中使用它會引入對C庫的依賴。
準備Kafka環(huán)境
以下docker-compose.yml文件用來搭建一套單節(jié)點zookeeper和單節(jié)點kafka環(huán)境,并且在8080端口提供kafka-ui管理界面。
version: '2.1'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka1
environment:
DYNAMIC_CONFIG_ENABLED: "TRUE"
將上述docker-compose.yml文件在本地保存,在同一目錄下執(zhí)行以下命令啟動容器。
docker-compose up -d
容器啟動后,使用瀏覽器打開127.0.0.1:8080 即可看到如下kafka-ui界面。
安裝kafka-go
執(zhí)行以下命令下載 kafka-go依賴。
go get github.com/segmentio/kafka-go
注意:kafka-go 需要 Go 1.15或更高版本。
kafka-go 提供了兩套與Kafka交互的API。
低級別( low-level):基于與 Kafka 服務(wù)器的原始網(wǎng)絡(luò)連接實現(xiàn)。
高級別(high-level):對于常用讀寫操作封裝了一套更易用的API。
通常建議直接使用高級別的交互API。
Connection
Conn 類型是 kafka-go 包的核心。它代表與 Kafka broker之間的連接?;谒鼘崿F(xiàn)了一套與Kafka交互的低級別 API。
發(fā)送消息
下面是連接至Kafka之后,使用Conn發(fā)送消息的代碼示例。
// writeByConn 基于Conn發(fā)送消息
func writeByConn() {
topic := "my-topic"
partition := 0
// 連接至Kafka集群的Leader節(jié)點
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
// 設(shè)置發(fā)送消息的超時時間
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
// 發(fā)送消息
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
// 關(guān)閉連接
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
消費消息
// readByConn 連接至kafka后接收消息
func readByConn() {
// 指定要連接的topic和partition
topic := "my-topic"
partition := 0
// 連接至Kafka的leader節(jié)點
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
// 設(shè)置讀取超時時間
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
// 讀取一批消息,得到的batch是一系列消息的迭代器
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
// 遍歷讀取消息
b := make([]byte, 10e3) // 10KB max per message
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}
// 關(guān)閉batch
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
// 關(guān)閉連接
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
}
使用batch.Read更高效一些,但是需要根據(jù)消息長度選擇合適的buffer(上述代碼中的b),如果傳入的buffer太小(消息裝不下)就會返回io.ErrShortBuffer錯誤。
如果不考慮內(nèi)存分配的效率問題,也可以按以下代碼使用batch.ReadMessage讀取消息。
for {
msg, err := batch.ReadMessage()
if err != nil {
break
}
fmt.Println(string(msg.Value))
}
創(chuàng)建topic
當Kafka關(guān)閉自動創(chuàng)建topic的設(shè)置時,可按如下方式創(chuàng)建topic。
// createTopicByConn 創(chuàng)建topic
func createTopicByConn() {
// 指定要創(chuàng)建的topic名稱
topic := "my-topic"
// 連接至任意kafka節(jié)點
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
// 獲取當前控制節(jié)點信息
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
// 連接至leader節(jié)點
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
// 創(chuàng)建topic
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
}
通過非leader節(jié)點連接leader節(jié)點
下面的示例代碼演示了如何通過已有的非leader節(jié)點的Conn,連接至 leader節(jié)點。
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
// 獲取當前控制節(jié)點信息
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer connLeader.Close()
獲取topic列表
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err.Error())
}
m := map[string]struct{}{}
// 遍歷所有分區(qū)取topic
for _, p := range partitions {
m[p.Topic] = struct{}{}
}
for k := range m {
fmt.Println(k)
}
Reader
Reader是由 kafka-go 包提供的另一個概念,對于從單個主題-分區(qū)(topic-partition)消費消息這種典型場景,使用它能夠簡化代碼。Reader 還實現(xiàn)了自動重連和偏移量管理,并支持使用 Context 支持異步取消和超時的 API。
注意: 當進程退出時,必須在 Reader 上調(diào)用 Close() 。Kafka服務(wù)器需要一個優(yōu)雅的斷開連接來阻止它繼續(xù)嘗試向已連接的客戶端發(fā)送消息。如果進程使用 SIGINT (shell 中的 Ctrl-C)或 SIGTERM (如 docker stop 或 kubernetes start)終止,那么下面給出的示例不會調(diào)用 Close()。當同一topic上有新Reader連接時,可能導(dǎo)致延遲(例如,新進程啟動或新容器運行)。在這種場景下應(yīng)使用signal.Notify處理程序在進程關(guān)閉時關(guān)閉Reader。
消費消息
下面的代碼演示了如何使用Reader連接至Kafka消費消息。
// readByReader 通過Reader接收消息
func readByReader() {
// 創(chuàng)建Reader
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "topic-A",
Partition: 0,
MaxBytes: 10e6, // 10MB
})
r.SetOffset(42) // 設(shè)置Offset
// 接收消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
// 程序退出前關(guān)閉Reader
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
消費者組
kafka-go支持消費者組,包括broker管理的offset。要啟用消費者組,只需在 ReaderConfig 中指定 GroupID。
使用消費者組時,ReadMessage 會自動提交偏移量。
// 創(chuàng)建一個reader,指定GroupID,從 topic-A 消費消息
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id", // 指定消費者組id
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
})
// 接收消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
// 程序退出前關(guān)閉Reader
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
在使用消費者組時會有以下限制:
(*Reader).SetOffset 當設(shè)置了GroupID時會返回錯誤(*Reader).Offset 當設(shè)置了GroupID時會永遠返回 -1(*Reader).Lag 當設(shè)置了GroupID時會永遠返回 -1(*Reader).ReadLag 當設(shè)置了GroupID時會返回錯誤(*Reader).Stats 當設(shè)置了GroupID時會返回一個-1的分區(qū)
顯式提交
kafka-go 也支持顯式提交。當需要顯式提交時不要調(diào)用 ReadMessage,而是調(diào)用 FetchMessage獲取消息,然后調(diào)用 CommitMessages 顯式提交。
ctx := context.Background()
for {
// 獲取消息
m, err := r.FetchMessage(ctx)
if err != nil {
break
}
// 處理消息
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
// 顯式提交
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
}
在消費者組中提交消息時,具有給定主題/分區(qū)的最大偏移量的消息確定該分區(qū)的提交偏移量的值。例如,如果通過調(diào)用 FetchMessage 獲取了單個分區(qū)的偏移量為 1、2 和 3 的消息,則使用偏移量為3的消息調(diào)用 CommitMessages 也將導(dǎo)致該分區(qū)的偏移量為 1 和 2 的消息被提交。
管理提交間隔
默認情況下,調(diào)用CommitMessages將同步向Kafka提交偏移量。為了提高性能,可以在ReaderConfig中設(shè)置CommitInterval來定期向Kafka提交偏移。
// 創(chuàng)建一個reader從 topic-A 消費消息
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second, // 每秒刷新一次提交給 Kafka
})
Writer
向Kafka發(fā)送消息,除了使用基于Conn的低級API,kafka-go包還提供了更高級別的 Writer 類型。大多數(shù)情況下使用Writer即可滿足條件,它支持以下特性。
對錯誤進行自動重試和重新連接。在可用分區(qū)之間可配置的消息分布。向Kafka同步或異步寫入消息。使用Context的異步取消。關(guān)閉時清除掛起的消息以支持正常關(guān)閉。在發(fā)布消息之前自動創(chuàng)建不存在的topic。
發(fā)送消息
// 創(chuàng)建一個writer 向topic-A發(fā)送消息
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.LeastBytes{}, // 指定分區(qū)的balancer模式為最小字節(jié)分布
RequiredAcks: kafka.RequireAll, // ack模式
Async: true, // 異步
}
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
創(chuàng)建不存在的topic
如果給Writer配置了AllowAutoTopicCreation:true,那么當發(fā)送消息至某個不存在的topic時,則會自動創(chuàng)建topic。
w := &Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
AllowAutoTopicCreation: true, // 自動創(chuàng)建topic
}
messages := []kafka.Message{
{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
}
var err error
const retries = 3
// 重試3次
for i := 0; i < retries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = w.WriteMessages(ctx, messages...)
if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
time.Sleep(time.Millisecond * 250)
continue
}
if err != nil {
log.Fatalf("unexpected error %v", err)
}
break
}
// 關(guān)閉Writer
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
寫入多個topic
通常,WriterConfig.Topic用于初始化單個topic的Writer。通過去掉WriterConfig中的Topic配置,分別設(shè)置每條消息的message.topic,可以實現(xiàn)將消息發(fā)送至多個topic。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
// 注意: 當此處不設(shè)置Topic時,后續(xù)的每條消息都需要指定Topic
Balancer: &kafka.LeastBytes{},
}
err := w.WriteMessages(context.Background(),
// 注意: 每條消息都需要指定一個 Topic, 否則就會報錯
kafka.Message{
Topic: "topic-A",
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Topic: "topic-B",
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Topic: "topic-C",
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
注意:Writer中的Topic和Message中的Topic是互斥的,同一時刻有且只能設(shè)置一處。
其他配置
TLS
對于基本的 Conn 類型或在 Reader/Writer 配置中,可以在Dialer中設(shè)置TLS選項。如果 TLS 字段為空,則它將不啟用TLS 連接。
注意:不在Conn/Reder/Writer上配置TLS,連接到啟用TLS的Kafka集群,可能會出現(xiàn)io.ErrUnexpectedEOF錯誤。
Connection
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls config...}, // 指定TLS配置
}
conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls config...}, // 指定TLS配置
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
})
Writer
創(chuàng)建Writer時可以按如下方式指定TLS配置。
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: &kafka.Transport{
TLS: &tls.Config{}, // 指定TLS配置
},
}
SASL
可以在Dialer上指定一個選項以使用SASL身份驗證。Dialer可以直接用來打開一個 Conn,也可以通過它們各自的配置傳遞給一個 Reader 或 Writer。如果 SASLMechanism字段為 nil,則不會使用 SASL 進行身份驗證。
SASL 身份驗證類型
明文
mechanism := plain.Mechanism{
Username: "username",
Password: "password",
}
SCRAM
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
Connection
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
})
Writer
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
// Transport 負責(zé)管理連接池和其他資源,
// 通常最好的使用方式是創(chuàng)建后在應(yīng)用程序中共享使用它們。
sharedTransport := &kafka.Transport{
SASL: mechanism,
}
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: sharedTransport,
}
Client
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
// Transport 負責(zé)管理連接池和其他資源,
// 通常最好的使用方式是創(chuàng)建后在應(yīng)用程序中共享使用它們。
sharedTransport := &kafka.Transport{
SASL: mechanism,
}
client := &kafka.Client{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Timeout: 10 * time.Second,
Transport: sharedTransport,
}
Balancer
kafka-go實現(xiàn)了多種負載均衡策略。特別是當你從其他Kafka庫遷移過來時,你可以按如下說明選擇合適的Balancer實現(xiàn)。
Sarama 如果從 sarama 切換過來,并且需要/希望使用相同的算法進行消息分區(qū),則可以使用kafka.Hash或kafka.ReferenceHash。
kafka.Hash = sarama.NewHashPartitioner
kafka.ReferenceHash = sarama.NewReferenceHashPartitioner
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
}
librdkafka和confluent-kafka-go:kafka.CRC32Balancer與librdkafka默認的consistent_random策略表現(xiàn)一致。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: kafka.CRC32Balancer{},
}
Java:使用kafka.Murmur2Balancer可以獲得與默認Java客戶端相同的策略。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: kafka.Murmur2Balancer{},
}
Compression
可以通過設(shè)置Compression字段在Writer上啟用壓縮:
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Compression: kafka.Snappy,
}
Reader 將通過檢查消息屬性來確定消費的消息是否被壓縮。
Logging
想要記錄Reader/Writer類型的操作,可以在創(chuàng)建時配置日志記錄器。
kafka-go中的Logger是一個接口類型。
type Logger interface {
Printf(string, ...interface{})
}
并且提供了一個LoggerFunc類型,幫我們實現(xiàn)了Logger接口。
type LoggerFunc func(string, ...interface{})
func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) }
Reader:借助kafka.LoggerFunc我們可以自定義一個Logger。
// 自定義一個Logger
func logf(msg string, a ...interface{}) {
fmt.Printf(msg, a...)
fmt.Println()
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "q1mi-topic",
Partition: 0,
Logger: kafka.LoggerFunc(logf),
ErrorLogger: kafka.LoggerFunc(logf),
})
Writer:也可以直接使用第三方日志庫,例如下面示例代碼中使用了zap日志庫。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "q1mi-topic",
Logger: kafka.LoggerFunc(zap.NewExample().Sugar().Infof),
ErrorLogger: kafka.LoggerFunc(zap.NewExample().Sugar().Errorf),
}
FAQ
Kafka中的消費者組(Consumer Group)的理解
Kafka中的消費者組(Consumer Group)是一個非常重要的概念,涉及到消息如何被消費和分發(fā)。讓我們來詳細解釋一下。
消費者組的概念
消費者組 是Kafka中一組協(xié)作消費同一個Topic的消費者。組內(nèi)的消費者共享同一個Group ID。Partition 是Kafka中Topic的基本單元,每個Topic可以有一個或多個分區(qū)(Partition)。消費者組的工作方式:
Kafka確保每條消息在一個消費者組內(nèi)只會被消費一次。消費者組中的不同消費者可以消費Topic的不同分區(qū)(Partition)。
消費者組內(nèi)的消息分配
假設(shè)你有一個Topic names,它有多個分區(qū),并且你創(chuàng)建了一個消費者組group1,里面有3個消費者:C1、C2 和 C3。消息的消費方式將取決于Topic中的分區(qū)數(shù)量。
分區(qū)數(shù)量少于消費者數(shù)量:假如names Topic只有2個分區(qū),而你有3個消費者,Kafka會將每個分區(qū)分配給一個消費者,剩余的消費者則不會接收到消息。比如C1消費分區(qū)1的消息,C2消費分區(qū)2的消息,而C3則不會分配到任何分區(qū)。 分區(qū)數(shù)量等于消費者數(shù)量:如果names Topic有3個分區(qū),每個消費者會分到一個分區(qū),每個分區(qū)的消息只會被分配給一個消費者。例如,C1消費分區(qū)1,C2消費分區(qū)2,C3消費分區(qū)3。 分區(qū)數(shù)量多于消費者數(shù)量:比如,names Topic有6個分區(qū),而你有3個消費者,這時每個消費者會被分配到多個分區(qū)。例如,C1消費分區(qū)1和分區(qū)4,C2消費分區(qū)2和分區(qū)5,C3消費分區(qū)3和分區(qū)6。
消息分發(fā)規(guī)則
每個分區(qū)中的消息 會被分配到對應(yīng)的消費者,分區(qū)內(nèi)的消息順序是被保證的。每條消息 只會被消費者組中的一個消費者消費(在同一個組內(nèi))。
這意味著,在同一個消費者組內(nèi),每條消息只會被一個消費者處理,消費后消息即從隊列中移除。
消費者組內(nèi)的每個消費者通過分區(qū)分攤消息。一個Topic中的每個分區(qū)會被消費者組中的一個消費者消費。組內(nèi)的所有消費者共同處理所有分區(qū)的消息。
這樣設(shè)計的好處是,你可以根據(jù)消費者組的數(shù)量靈活地調(diào)整并行處理的能力,同時還能保證在同一個消費者組內(nèi)消息只被消費一次。
讓我們通過一個具體的例子來深入理解Kafka消費者組的概念及其工作方式。
實際例子
假設(shè)有一個Topic叫做 names,它有6個分區(qū)(Partition),用來存儲用戶的名字?,F(xiàn)在,你有一個消費者組 group1,并且在這個組中創(chuàng)建了3個消費者 C1、C2 和 C3。
場景1:消費者組內(nèi)的消費者數(shù)量小于分區(qū)數(shù)量
Topic: names分區(qū)數(shù)量: 6消費者組: group1消費者數(shù)量: 3 (C1, C2, C3)
在這種情況下,Kafka會將6個分區(qū)分配給這3個消費者。假設(shè)分配方式如下:
C1 消費分區(qū) P1 和 P4C2 消費分區(qū) P2 和 P5C3 消費分區(qū) P3 和 P6
因此,分區(qū) P1 中的所有消息只會被消費者 C1 消費,P2 中的消息只會被消費者 C2 消費,依此類推。每條消息在消費者組 group1 中只會被一個消費者消費。
場景2:增加消費者以提高處理能力
假設(shè)隨著業(yè)務(wù)發(fā)展,你需要更快地處理這些消息。你可以在group1中再增加3個消費者,使總消費者數(shù)量達到6個。
消費者數(shù)量: 6 (C1, C2, C3, C4, C5, C6)
Kafka會重新分配分區(qū),現(xiàn)在每個消費者消費一個分區(qū):
C1 消費 P1C2 消費 P2C3 消費 P3C4 消費 P4C5 消費 P5C6 消費 P6
這樣一來,消息處理能力就提升了,因為現(xiàn)在有6個消費者并行處理這6個分區(qū)中的消息。
場景3:消費者數(shù)量多于分區(qū)數(shù)量
如果你增加到8個消費者,而 names 這個Topic仍然只有6個分區(qū),Kafka會把這6個分區(qū)分配給前6個消費者,而剩余的兩個消費者 C7 和 C8 不會接收到任何消息,處于空閑狀態(tài)。
“根據(jù)消費者組的數(shù)量靈活地調(diào)整并行處理的能力”:
如果需要提高消息處理速度,可以增加消費者組中的消費者數(shù)量,讓更多消費者并行處理消息。但是消費者數(shù)量不應(yīng)該超過分區(qū)數(shù)量,否則有些消費者會閑置。 “一個Topic中的每個分區(qū)會被消費者組中的一個消費者消費”:
在同一個消費者組內(nèi),每個分區(qū)只能被一個消費者消費,這樣保證了分區(qū)內(nèi)消息的順序性和唯一性。不同消費者組可以同時消費同一個Topic的消息,但每個組內(nèi)的消息是獨立消費的。
這個設(shè)計提供了靈活性,允許你根據(jù)需求調(diào)整消費者數(shù)量以達到所需的吞吐量和并行處理能力。
當然可以。下面是一個簡單的Go示例代碼,展示如何使用 confluent-kafka-go 庫(這是一個流行的Kafka客戶端庫)來創(chuàng)建Kafka消費者,并且如何增加消費者來提升處理能力。
示例代碼
首先,你需要安裝 confluent-kafka-go 庫。你可以使用以下命令安裝它:
go get github.com/confluentinc/confluent-kafka-go/kafka
以下是一個簡單的Go程序,演示了如何創(chuàng)建一個Kafka消費者,并且增加消費者來處理消息。
1. 單個消費者
Kafka中的消費者組是通過消費者配置中的 group.id 參數(shù)來定義的。消費者組的概念是Kafka客戶端的配置的一部分,而不是需要顯式創(chuàng)建的實體。Kafka會自動管理消費者組的協(xié)調(diào)和分配。
這是一個單消費者的簡單示例:
package main
import (
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// 創(chuàng)建Kafka消費者配置
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "example-group",
"auto.offset.reset": "earliest",
}
// 創(chuàng)建Kafka消費者
consumer, err := kafka.NewConsumer(config)
if err != nil {
log.Fatalf("Failed to create consumer: %s", err)
}
// 訂閱Topic
consumer.Subscribe("names", nil)
// 消費消息
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
log.Printf("Consumer error: %v", err)
continue
}
log.Printf("Message: %s", string(msg.Value))
}
}
2. 增加消費者
要增加消費者,你可以啟動多個實例的消費者程序,每個實例都使用相同的 group.id。Kafka會自動負載均衡各個消費者對Topic分區(qū)的消費。
以下是啟動多個消費者的示例:
假設(shè)你已經(jīng)將上述代碼保存為 consumer.go。你可以在不同的終端中運行多個消費者實例,模擬增加消費者:
go run consumer.go
你可以在不同的終端窗口中運行這個命令多次。例如:
# 終端 1
go run consumer.go
# 終端 2
go run consumer.go
# 終端 3
go run consumer.go
每個運行的消費者實例都會加入到 example-group 消費者組中,Kafka會將Topic的分區(qū)分配給這些消費者,確保負載均衡。
注意事項
確保你已經(jīng)正確配置了Kafka集群和Topic,并且它們正在運行。增加消費者的實際效果取決于Topic的分區(qū)數(shù)量。消費者的數(shù)量不應(yīng)超過分區(qū)的數(shù)量,否則會有消費者閑置。每個消費者實例會從Kafka中讀取消息,處理完后Kafka會將消息標記為已消費,并從Topic中移除。
這樣,你可以通過啟動多個消費者實例來提高消息處理的并發(fā)能力。
柚子快報激活碼778899分享:java kafka全解
相關(guān)文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。