柚子快報邀請碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記
柚子快報邀請碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記
Kafka
Kafka 是一個分布式的基于發(fā)布/訂閱模式的消息隊列(Message Queue),主要應(yīng)用于大數(shù)據(jù)實時處理領(lǐng)域。
Kafka可以用作Flink應(yīng)用程序的數(shù)據(jù)源。Flink可以輕松地從一個或多個Kafka主題中消費數(shù)據(jù)流。這意味著您可以使用Kafka來捕獲和傳輸實時數(shù)據(jù),并將其發(fā)送到Flink進行進一步處理。
Flink和Kafka在實時數(shù)據(jù)處理和流處理應(yīng)用程序中通常協(xié)同工作,Kafka用于數(shù)據(jù)傳輸和捕獲,而Flink用于數(shù)據(jù)處理和分析。
Kafka由 生產(chǎn)者 Broker 消費者組成,生產(chǎn)者和消費者是由Java語言編寫的,Broker由Scala語言寫的。
基礎(chǔ)架構(gòu)
Producer:kafka 生產(chǎn)者,用于接收外部數(shù)據(jù),然后將數(shù)據(jù)發(fā)送給kafka集群存儲,假如要發(fā)100T的數(shù)據(jù)。Consumer:消費者,就是到kafka中取數(shù)據(jù)的客戶端,比如:flink就是一個消費者,到kafka中取出數(shù)據(jù)計算處理。Broker:一個Broker就是一個kafka服務(wù)器,如果你在一個虛擬機上安裝了kafka,那么這個虛擬機就是一個Broker。Partition:分區(qū)。前面說了,要發(fā)送100T的數(shù)據(jù)給kafka,那如果只用一臺kafka服務(wù)器(Broker)接收肯定不好,太大了。所以就有了kafka集群一起處理,這100T的數(shù)據(jù)是一個主題,太大了,就考慮分區(qū),分成3個區(qū),每個分區(qū)分到不同的kafka服務(wù)器上,一個分區(qū)存33T。Consumer Group:消費者組。由多個消費者組成,消費者就是來取kafka中的數(shù)據(jù)來處理使用的?,F(xiàn)在kafka已經(jīng)存儲了100T的數(shù)據(jù),假如一個消費者來取使用,肯定比較慢,所以就可以引入多個消費者一起來取數(shù)據(jù)處理。一個分區(qū)中的數(shù)據(jù)只能由一個消費者Replica:副本。每個分區(qū)可以設(shè)置一個或多個副本,我理解是副本會同步主分區(qū)中的數(shù)據(jù),假如主分區(qū)掛了,副本就可以頂上去了。Leader:領(lǐng)導(dǎo)。主分區(qū),所有副本分區(qū)中的主分區(qū),生產(chǎn)者和消費者都只操作主分區(qū)。Follower:除了主分區(qū),其他的副本分區(qū)都是Follower,F(xiàn)ollower會從Leader中同步數(shù)據(jù),當Leader掛了,某個Follower會成為新的Leader。zookeeeper: ZooKeeper 用于協(xié)調(diào)和管理 Kafka 集群的各個組件,包括 Broker、Topic 配置、分區(qū)分配、Leader 選舉等。Kafka 使用 ZooKeeper 來維護集群的整體狀態(tài)和配置信息,以確保各個組件之間的協(xié)同工作。
生產(chǎn)者
在消息發(fā)送的過程中,涉及到了兩個線程——main?線程和?Sender?線程。在 main 線程中創(chuàng)建了一個雙端隊列?RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator, Sender 線程不斷從RecordAccumulator 中拉取消息發(fā)送到Kafka Broker。
生產(chǎn)者的main線程
main線程先創(chuàng)建Producer對象,然后調(diào)用send方法,數(shù)據(jù)會經(jīng)過攔截器,進行過濾處理,如果不需要可以不設(shè)置攔截器,攔截器用的較少。接著經(jīng)過序列化器對數(shù)據(jù)進行序列化(在網(wǎng)絡(luò)中傳輸數(shù)據(jù)需要序列化將數(shù)據(jù)轉(zhuǎn)成通用的字節(jié)流便于網(wǎng)絡(luò)傳輸),然后經(jīng)過分區(qū)器,分區(qū)器決定每條數(shù)據(jù)要發(fā)往哪個分區(qū),然后將每條數(shù)據(jù)發(fā)給對應(yīng)的分區(qū),一個分區(qū)對應(yīng)一個DQuee(雙端隊列),隊列中會有一批一批數(shù)據(jù),一批數(shù)據(jù)默認大小是16k。
總的來說,main線程將數(shù)據(jù)發(fā)到RecordAccumulator記錄累加器中,默認大小是32m,這個是在內(nèi)存中,起到緩存的作用,將大量的數(shù)據(jù)一批一批發(fā)給kafka,提高網(wǎng)絡(luò)傳輸速率。累加器使用有限的內(nèi)存,當內(nèi)存耗盡時(生成者產(chǎn)生數(shù)據(jù)的速度超過發(fā)送給服務(wù)器的速度),追加調(diào)用將阻塞,除非顯式禁用此行為。
sender線程,負責(zé)將數(shù)據(jù)發(fā)給kafka。數(shù)據(jù)是分批次發(fā)給kafka,當一個批次的數(shù)據(jù)達到16k或等待的時間達到linger.ms設(shè)置的時間,一個批次的數(shù)據(jù)就會被sender發(fā)給kafka,一個批次就是分區(qū)隊列中那個小正方形。
sender發(fā)送數(shù)據(jù):broker1(request1,request2,request3,request4,request5),每個kafka節(jié)點維護一個發(fā)送數(shù)據(jù)的請求緩存,這個請求緩存最多緩存5個請求,如果請求發(fā)送失敗了,會使用后面的請求繼續(xù)發(fā)。批數(shù)據(jù)到達對應(yīng)的broker后,會先同步副本。
生產(chǎn)者分區(qū)
分區(qū)好處:
1)便于合理使用存儲資源,每個Partition在一個Broker上存儲,可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)存儲在多臺Broker上。合理控制分區(qū)的任務(wù),可以實現(xiàn)負載均衡的效果。
2)提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費者可以以分區(qū)為單位進行消費數(shù)據(jù),從而提高消費處理數(shù)據(jù)的速度。
生產(chǎn)者發(fā)送消息的分區(qū)策略
ProducerRecord是生產(chǎn)者發(fā)送數(shù)據(jù)的單位
自定義分區(qū)器
也可以自定義分區(qū)器,自己決定數(shù)據(jù)要發(fā)到哪個分區(qū)中
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
發(fā)送過來的數(shù)據(jù)中如果包含 atguigu,就發(fā)往 0 號分區(qū),不包含 atguigu,就發(fā)往 1 號分區(qū)
定義類實現(xiàn) Partitioner 接口,重寫 partition()方法。
* 1. 實現(xiàn)接口 Partitioner
* 2. 實現(xiàn) 3 個方法:partition,close,configure
* 3. 編寫 partition 方法,返回分區(qū)號
*/
public class MyPartitioner implements Partitioner {
/**
* 返回信息對應(yīng)的分區(qū)
* @param topic 主題
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字節(jié)數(shù)組
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字節(jié)數(shù)組
* @param cluster 集群元數(shù)據(jù)可以查看分區(qū)信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[]
keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取消息
String msgValue = value.toString();
// 創(chuàng)建 partition
int partition;
// 判斷消息是否包含 atguigu
if (msgValue.contains("atguigu")){
partition = 0;
}else {
partition = 1;
}
// 返回分區(qū)號
return partition;
}
// 關(guān)閉資源
@Override
public void close() {
}
// 配置方法
@Override
public void configure(Map
}
}
然后在生產(chǎn)者配置里加上自定義分區(qū)器
// 添加自定義分區(qū)器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
生產(chǎn)者如何提高吞吐量
如何提高生產(chǎn)者發(fā)送數(shù)據(jù)的速度,主要是調(diào)整以下四個參數(shù)
? batch.size:批次大小,默認16k
? linger.ms:等待時間,修改為5-100ms
默認是0ms,就是數(shù)據(jù)一到隊列中就發(fā)給broker,這樣的好處就是實時性好,但是效率低,一次發(fā)幾條數(shù)據(jù)總比一次發(fā)一條效率高。也不能改太大,太大時效性不好。
? compression.type:壓縮snappy
壓縮數(shù)據(jù),這樣一批次就可以存更多的數(shù)據(jù)
? RecordAccumulator:緩沖區(qū)大小,可修改為64m
生產(chǎn)者數(shù)據(jù)可靠性
主要是當broker收到數(shù)據(jù)后的應(yīng)答機制
ISR隊列是只一個分區(qū)的Leader和所有的Followers的集合,ISR(0,1,2),為了解決那個問題,如果Leader長時間沒收到某個Follower同步數(shù)據(jù)的請求,就會認為這個Follower故障了,就會從ISR隊列中踢出這個Follower,ISR(0,1)。
如果Follower長時間未向Leader發(fā)送通信請求或同步數(shù)據(jù),則 該Follower將被踢出ISR。該時間閾值由replica.lag.time.max.ms參 數(shù)設(shè)定,默認30s。例如2超時,(leader:0, isr:0,1)。
數(shù)據(jù)完全可靠條件 = ACK級別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2
數(shù)據(jù)可靠性越強,效率越慢
// 設(shè)置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重試次數(shù) retries,默認是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
消費者
消費者消費一個分區(qū)中的數(shù)據(jù)時,會跟蹤他們自己消費到的偏移量,Kafka 會定期將偏移量提交到 Kafka 主題中的特殊主題(__consumer_offsets)中,這樣,消費者如果停止或重新啟動后,會從上次的偏移量繼續(xù)消費。偏移量是每條消息在分區(qū)中的位置。
消費者組
由多個kafka消費者組成的一組消費者組,用于同時消費處理kafka中一個主題所有分區(qū)中的數(shù)據(jù)。只要在創(chuàng)建kafka消費者的時候?qū)roup id設(shè)置成一樣的,那么就可以創(chuàng)建多個消費者構(gòu)成消費者組了。
一個主題的一個分區(qū)只能由一個消費者組內(nèi)的一個消費者處理,否則會導(dǎo)致數(shù)據(jù)重復(fù)消費。一個消費者組的每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù)。
消費者組的好處:加快消費處理數(shù)據(jù)的速度,橫向提高整個消費能力。如下圖,一開始就一個消費者c1,他要自己一個人消費處理來自topicA主題的四個分區(qū)的數(shù)據(jù),而我們可以增加三個消費者c2、c3、c4和c1構(gòu)成一個消費者組來同時消費處理topicA主題的四個分區(qū)的數(shù)據(jù),這樣消費處理數(shù)據(jù)的速度就提升了。(前提是有多個分區(qū))
主題
上面那樣肯定不好,各種消息的的生產(chǎn)者(生產(chǎn)圓蛋蛋、生產(chǎn)方框框、生產(chǎn)小心心)將消息都發(fā)給kafka,然后kafka將消息都分類,每種分類都有相應(yīng)的主題,然后消費者根據(jù)需要訂閱相應(yīng)的主題。就能收到對應(yīng)的消息。
分區(qū)
如果一個主題的消息比較多,就可以考慮分區(qū),分區(qū)可以分布在不同的服務(wù)器上,所以主題也可以分布在不同的服務(wù)器上,這樣比單服務(wù)器處理快。
如果生成者沒有指定分區(qū),分區(qū)器就會根據(jù)每條消息的鍵算出消息該去哪個分區(qū)。鍵:就是每條消息的一個標記,決定了消息該去哪個分區(qū)。分區(qū)器:就是一個算法,算消息該去哪個分區(qū),輸入是鍵,輸出是消息去的分區(qū)。
偏移量
偏移量就是消息在每個分區(qū)中的位置,kafka在收到消息的時候,會為每個消息設(shè)置偏移量,然后將消息存到磁盤中。
消費者只能按順序消費讀取。消費者如果要分區(qū)0的第四個,kafka就會說第三個還沒讀取,不給第四個。
kafka集群
一個broker就是一個kafka服務(wù)器。下面有兩個broker構(gòu)成了kafka集群,他們的數(shù)據(jù)通過復(fù)制同步,當有一個kafka宕機了,另一臺就可以先頂上,保證了kafka的可靠性。
監(jiān)控kafka
這個前提得先安裝jdk
1、修改kafka的啟動腳本
vim bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
改為
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G
-XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200
-XX:ParallelGCThreads=8 -XX:ConcGCThreads=5
-XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
修改kafka進程信息:
-Xms2G:設(shè)置 Kafka 進程的初始堆內(nèi)存大小為 2 GB。
-Xmx2G:設(shè)置 Kafka 進程的最大堆內(nèi)存大小為 2 GB。
XX:PermSize=128m:設(shè)置持久代(PermGen)的初始大小為 128 MB。請注意,這個選項在 Java 8 和更新的版本中不再適用,因為 PermGen 已被 Metaspace 取代。
-XX:+UseG1GC:指定使用 G1 垃圾收集器。
-XX:MaxGCPauseMillis=200:設(shè)置最大垃圾收集暫停時間為 200 毫秒。
XX:ParallelGCThreads=8:設(shè)置并行垃圾收集線程的數(shù)量為 8。
XX:ConcGCThreads=5:設(shè)置并發(fā)垃圾收集線程的數(shù)量為 5。
XX:InitiatingHeapOccupancyPercent=70:設(shè)置堆內(nèi)存占用百分比,當堆內(nèi)存使用達到 70% 時,啟動并發(fā)垃圾收集。
這些參數(shù)的目的是調(diào)整 Kafka 進程的性能和垃圾收集行為,以滿足特定的性能需求。請注意,這些參數(shù)的值可以根據(jù)你的 Kafka 部署和硬件資源進行調(diào)整。堆內(nèi)存的大小和垃圾收集器的選擇將影響 Kafka 的性能和穩(wěn)定性。
最后,這段腳本還設(shè)置了 JMX 端口為 9999,這是用于監(jiān)控 Kafka 進程的 Java Management Extensions(JMX)端口。通過此端口,你可以使用 JMX 工具監(jiān)控 Kafka 進程的性能指標和狀態(tài)。如果需要監(jiān)控 Kafka,你可以使用 JMX 工具連接到此端口。
2、官網(wǎng)下載安裝包
https://www.kafka-eagle.org/
3、上傳解壓
第一次解壓后,里面有個壓縮包再解壓才是真正的。
/opt/module/efak/conf/system-config.properties
5、配置環(huán)境變量
$ sudo vim /etc/profile.d/my_env.sh
# kafkaEFAK
export KE_HOME=/opt/module/efak
export PATH=$PATH:$KE_HOME/bin
source /etc/profile
6、啟動
/bin/kf.sh start
壓力測試
# 單Kafka服務(wù)器,生成者發(fā)送1000000條數(shù)據(jù),每條大小1k,總共發(fā)送大約
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=linjl:9092 batch.size=16384 linger.ms=0
batch.size=16384 linger.ms=0 9.76 MB/sec
record-size 是一條信息有多大,單位是字節(jié),本次測試設(shè)置為 1k。
BUG
1、Error while fetching metadata with correlation id : {LEADER_NOT_AVAILABLE}
2、
[root@linjl kafka_2.12-3.0.0]# ./bin/kafka-console-consumer.sh --topic quickstart-events --bootstrap-server linjl:9092
[2023-09-13 16:51:54,710] WARN [Consumer clientId=consumer-console-consumer-32025-1, groupId=console-consumer-32025] Error while fetching metadata with correlation id 2 : {quickstart-events=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
這個警告消息 “Error while fetching metadata with correlation id 2 : {quickstart-events=LEADER_NOT_AVAILABLE}” 表示 Kafka 消費者在嘗試訂閱主題 “quickstart-events” 時遇到了 “LEADER_NOT_AVAILABLE” 錯誤。這個錯誤通常表示消費者無法找到主題的 leader 分區(qū),因此它無法讀取消息。
我的猜想: 可能是因為 Kafka 服務(wù)器無法從 ZooKeeper 獲取到有關(guān) “quickstart-events” 主題的元數(shù)據(jù)信息,包括分區(qū)的 Leader 信息。
3、Received invalid metadata error in produce request on partition quickstart-events-0
due to org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk… Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
表示在嘗試將消息寫入分區(qū) “quickstart-events-0” 時,Kafka 生產(chǎn)者遇到了磁盤錯誤,無法訪問日志文件。這個錯誤通常與磁盤故障或磁盤空間不足有關(guān)。
4、Java客戶端創(chuàng)建生產(chǎn)者,發(fā)送消息給kafka沒響應(yīng)。
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.128:9092");
// key,value 序列化(必須):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
網(wǎng)絡(luò)連接都能通,而且防火墻也都關(guān)了。
解決:在server.properties配置文件中配置
# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.239.128:9092
kafka和flink結(jié)合案例
數(shù)據(jù)寫入kafka,flink訂閱消費
安裝kafka單服務(wù)
1、官方下載地址:http://kafka.apache.org/downloads.html
2、解壓安裝包
下載完將安裝包上傳到centos中,然后解壓
$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
3、 修改解壓后的文件名稱
$ mv kafka_2.12-3.0.0/ kafka
4、進入到/opt/module/kafka 目錄,修改配置文件
$ cd config/
$ vim server.properties
#broker 的全局唯一編號,不能重復(fù),只能是數(shù)字。
broker.id=0
#處理網(wǎng)絡(luò)請求的線程數(shù)量
num.network.threads=3
#用來處理磁盤 IO 的線程數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka 運行日志(數(shù)據(jù))存放的路徑,路徑不需要提前創(chuàng)建,kafka 自動幫你創(chuàng)建,可以配置多個磁盤路徑,路徑與路徑之間可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic 在當前 broker 上的分區(qū)個數(shù)
num.partitions=1
#用來恢復(fù)和清理 data 下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
# 每個 topic 創(chuàng)建時的副本數(shù),默認時 1 個副本
offsets.topic.replication.factor=1
#segment 文件保留的最長時間,超時將被刪除
log.retention.hours=168
#每個 segment 文件的大小,默認最大 1G
log.segment.bytes=1073741824
# 檢查過期數(shù)據(jù)的時間,默認 5 分鐘檢查一次是否數(shù)據(jù)過期
log.retention.check.interval.ms=300000
#配置連接Zookeeper 集群地址(在 zk 根目錄下創(chuàng)建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
5、配置kafka環(huán)境變量
vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新
$ source /etc/profile
6、啟動kafka
./kafka/bin/kafka-server-start.sh
創(chuàng)建生產(chǎn)者,將數(shù)據(jù)寫入kafka
柚子快報邀請碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記
文章鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。