柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記
柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記
Kafka
Kafka 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(Message Queue),主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。
Kafka可以用作Flink應(yīng)用程序的數(shù)據(jù)源。Flink可以輕松地從一個(gè)或多個(gè)Kafka主題中消費(fèi)數(shù)據(jù)流。這意味著您可以使用Kafka來(lái)捕獲和傳輸實(shí)時(shí)數(shù)據(jù),并將其發(fā)送到Flink進(jìn)行進(jìn)一步處理。
Flink和Kafka在實(shí)時(shí)數(shù)據(jù)處理和流處理應(yīng)用程序中通常協(xié)同工作,Kafka用于數(shù)據(jù)傳輸和捕獲,而Flink用于數(shù)據(jù)處理和分析。
Kafka由 生產(chǎn)者 Broker 消費(fèi)者組成,生產(chǎn)者和消費(fèi)者是由Java語(yǔ)言編寫(xiě)的,Broker由Scala語(yǔ)言寫(xiě)的。
基礎(chǔ)架構(gòu)
Producer:kafka 生產(chǎn)者,用于接收外部數(shù)據(jù),然后將數(shù)據(jù)發(fā)送給kafka集群存儲(chǔ),假如要發(fā)100T的數(shù)據(jù)。Consumer:消費(fèi)者,就是到kafka中取數(shù)據(jù)的客戶(hù)端,比如:flink就是一個(gè)消費(fèi)者,到kafka中取出數(shù)據(jù)計(jì)算處理。Broker:一個(gè)Broker就是一個(gè)kafka服務(wù)器,如果你在一個(gè)虛擬機(jī)上安裝了kafka,那么這個(gè)虛擬機(jī)就是一個(gè)Broker。Partition:分區(qū)。前面說(shuō)了,要發(fā)送100T的數(shù)據(jù)給kafka,那如果只用一臺(tái)kafka服務(wù)器(Broker)接收肯定不好,太大了。所以就有了kafka集群一起處理,這100T的數(shù)據(jù)是一個(gè)主題,太大了,就考慮分區(qū),分成3個(gè)區(qū),每個(gè)分區(qū)分到不同的kafka服務(wù)器上,一個(gè)分區(qū)存33T。Consumer Group:消費(fèi)者組。由多個(gè)消費(fèi)者組成,消費(fèi)者就是來(lái)取kafka中的數(shù)據(jù)來(lái)處理使用的?,F(xiàn)在kafka已經(jīng)存儲(chǔ)了100T的數(shù)據(jù),假如一個(gè)消費(fèi)者來(lái)取使用,肯定比較慢,所以就可以引入多個(gè)消費(fèi)者一起來(lái)取數(shù)據(jù)處理。一個(gè)分區(qū)中的數(shù)據(jù)只能由一個(gè)消費(fèi)者Replica:副本。每個(gè)分區(qū)可以設(shè)置一個(gè)或多個(gè)副本,我理解是副本會(huì)同步主分區(qū)中的數(shù)據(jù),假如主分區(qū)掛了,副本就可以頂上去了。Leader:領(lǐng)導(dǎo)。主分區(qū),所有副本分區(qū)中的主分區(qū),生產(chǎn)者和消費(fèi)者都只操作主分區(qū)。Follower:除了主分區(qū),其他的副本分區(qū)都是Follower,F(xiàn)ollower會(huì)從Leader中同步數(shù)據(jù),當(dāng)Leader掛了,某個(gè)Follower會(huì)成為新的Leader。zookeeeper: ZooKeeper 用于協(xié)調(diào)和管理 Kafka 集群的各個(gè)組件,包括 Broker、Topic 配置、分區(qū)分配、Leader 選舉等。Kafka 使用 ZooKeeper 來(lái)維護(hù)集群的整體狀態(tài)和配置信息,以確保各個(gè)組件之間的協(xié)同工作。
生產(chǎn)者
在消息發(fā)送的過(guò)程中,涉及到了兩個(gè)線(xiàn)程——main?線(xiàn)程和?Sender?線(xiàn)程。在 main 線(xiàn)程中創(chuàng)建了一個(gè)雙端隊(duì)列?RecordAccumulator。main 線(xiàn)程將消息發(fā)送給 RecordAccumulator, Sender 線(xiàn)程不斷從RecordAccumulator 中拉取消息發(fā)送到Kafka Broker。
生產(chǎn)者的main線(xiàn)程
main線(xiàn)程先創(chuàng)建Producer對(duì)象,然后調(diào)用send方法,數(shù)據(jù)會(huì)經(jīng)過(guò)攔截器,進(jìn)行過(guò)濾處理,如果不需要可以不設(shè)置攔截器,攔截器用的較少。接著經(jīng)過(guò)序列化器對(duì)數(shù)據(jù)進(jìn)行序列化(在網(wǎng)絡(luò)中傳輸數(shù)據(jù)需要序列化將數(shù)據(jù)轉(zhuǎn)成通用的字節(jié)流便于網(wǎng)絡(luò)傳輸),然后經(jīng)過(guò)分區(qū)器,分區(qū)器決定每條數(shù)據(jù)要發(fā)往哪個(gè)分區(qū),然后將每條數(shù)據(jù)發(fā)給對(duì)應(yīng)的分區(qū),一個(gè)分區(qū)對(duì)應(yīng)一個(gè)DQuee(雙端隊(duì)列),隊(duì)列中會(huì)有一批一批數(shù)據(jù),一批數(shù)據(jù)默認(rèn)大小是16k。
總的來(lái)說(shuō),main線(xiàn)程將數(shù)據(jù)發(fā)到RecordAccumulator記錄累加器中,默認(rèn)大小是32m,這個(gè)是在內(nèi)存中,起到緩存的作用,將大量的數(shù)據(jù)一批一批發(fā)給kafka,提高網(wǎng)絡(luò)傳輸速率。累加器使用有限的內(nèi)存,當(dāng)內(nèi)存耗盡時(shí)(生成者產(chǎn)生數(shù)據(jù)的速度超過(guò)發(fā)送給服務(wù)器的速度),追加調(diào)用將阻塞,除非顯式禁用此行為。
sender線(xiàn)程,負(fù)責(zé)將數(shù)據(jù)發(fā)給kafka。數(shù)據(jù)是分批次發(fā)給kafka,當(dāng)一個(gè)批次的數(shù)據(jù)達(dá)到16k或等待的時(shí)間達(dá)到linger.ms設(shè)置的時(shí)間,一個(gè)批次的數(shù)據(jù)就會(huì)被sender發(fā)給kafka,一個(gè)批次就是分區(qū)隊(duì)列中那個(gè)小正方形。
sender發(fā)送數(shù)據(jù):broker1(request1,request2,request3,request4,request5),每個(gè)kafka節(jié)點(diǎn)維護(hù)一個(gè)發(fā)送數(shù)據(jù)的請(qǐng)求緩存,這個(gè)請(qǐng)求緩存最多緩存5個(gè)請(qǐng)求,如果請(qǐng)求發(fā)送失敗了,會(huì)使用后面的請(qǐng)求繼續(xù)發(fā)。批數(shù)據(jù)到達(dá)對(duì)應(yīng)的broker后,會(huì)先同步副本。
生產(chǎn)者分區(qū)
分區(qū)好處:
1)便于合理使用存儲(chǔ)資源,每個(gè)Partition在一個(gè)Broker上存儲(chǔ),可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)存儲(chǔ)在多臺(tái)Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果。
2)提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù),從而提高消費(fèi)處理數(shù)據(jù)的速度。
生產(chǎn)者發(fā)送消息的分區(qū)策略
ProducerRecord是生產(chǎn)者發(fā)送數(shù)據(jù)的單位
自定義分區(qū)器
也可以自定義分區(qū)器,自己決定數(shù)據(jù)要發(fā)到哪個(gè)分區(qū)中
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
發(fā)送過(guò)來(lái)的數(shù)據(jù)中如果包含 atguigu,就發(fā)往 0 號(hào)分區(qū),不包含 atguigu,就發(fā)往 1 號(hào)分區(qū)
定義類(lèi)實(shí)現(xiàn) Partitioner 接口,重寫(xiě) partition()方法。
* 1. 實(shí)現(xiàn)接口 Partitioner
* 2. 實(shí)現(xiàn) 3 個(gè)方法:partition,close,configure
* 3. 編寫(xiě) partition 方法,返回分區(qū)號(hào)
*/
public class MyPartitioner implements Partitioner {
/**
* 返回信息對(duì)應(yīng)的分區(qū)
* @param topic 主題
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字節(jié)數(shù)組
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字節(jié)數(shù)組
* @param cluster 集群元數(shù)據(jù)可以查看分區(qū)信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[]
keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取消息
String msgValue = value.toString();
// 創(chuàng)建 partition
int partition;
// 判斷消息是否包含 atguigu
if (msgValue.contains("atguigu")){
partition = 0;
}else {
partition = 1;
}
// 返回分區(qū)號(hào)
return partition;
}
// 關(guān)閉資源
@Override
public void close() {
}
// 配置方法
@Override
public void configure(Map
}
}
然后在生產(chǎn)者配置里加上自定義分區(qū)器
// 添加自定義分區(qū)器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
生產(chǎn)者如何提高吞吐量
如何提高生產(chǎn)者發(fā)送數(shù)據(jù)的速度,主要是調(diào)整以下四個(gè)參數(shù)
? batch.size:批次大小,默認(rèn)16k
? linger.ms:等待時(shí)間,修改為5-100ms
默認(rèn)是0ms,就是數(shù)據(jù)一到隊(duì)列中就發(fā)給broker,這樣的好處就是實(shí)時(shí)性好,但是效率低,一次發(fā)幾條數(shù)據(jù)總比一次發(fā)一條效率高。也不能改太大,太大時(shí)效性不好。
? compression.type:壓縮snappy
壓縮數(shù)據(jù),這樣一批次就可以存更多的數(shù)據(jù)
? RecordAccumulator:緩沖區(qū)大小,可修改為64m
生產(chǎn)者數(shù)據(jù)可靠性
主要是當(dāng)broker收到數(shù)據(jù)后的應(yīng)答機(jī)制
ISR隊(duì)列是只一個(gè)分區(qū)的Leader和所有的Followers的集合,ISR(0,1,2),為了解決那個(gè)問(wèn)題,如果Leader長(zhǎng)時(shí)間沒(méi)收到某個(gè)Follower同步數(shù)據(jù)的請(qǐng)求,就會(huì)認(rèn)為這個(gè)Follower故障了,就會(huì)從ISR隊(duì)列中踢出這個(gè)Follower,ISR(0,1)。
如果Follower長(zhǎng)時(shí)間未向Leader發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則 該Follower將被踢出ISR。該時(shí)間閾值由replica.lag.time.max.ms參 數(shù)設(shè)定,默認(rèn)30s。例如2超時(shí),(leader:0, isr:0,1)。
數(shù)據(jù)完全可靠條件 = ACK級(jí)別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2
數(shù)據(jù)可靠性越強(qiáng),效率越慢
// 設(shè)置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重試次數(shù) retries,默認(rèn)是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
消費(fèi)者
消費(fèi)者消費(fèi)一個(gè)分區(qū)中的數(shù)據(jù)時(shí),會(huì)跟蹤他們自己消費(fèi)到的偏移量,Kafka 會(huì)定期將偏移量提交到 Kafka 主題中的特殊主題(__consumer_offsets)中,這樣,消費(fèi)者如果停止或重新啟動(dòng)后,會(huì)從上次的偏移量繼續(xù)消費(fèi)。偏移量是每條消息在分區(qū)中的位置。
消費(fèi)者組
由多個(gè)kafka消費(fèi)者組成的一組消費(fèi)者組,用于同時(shí)消費(fèi)處理kafka中一個(gè)主題所有分區(qū)中的數(shù)據(jù)。只要在創(chuàng)建kafka消費(fèi)者的時(shí)候?qū)roup id設(shè)置成一樣的,那么就可以創(chuàng)建多個(gè)消費(fèi)者構(gòu)成消費(fèi)者組了。
一個(gè)主題的一個(gè)分區(qū)只能由一個(gè)消費(fèi)者組內(nèi)的一個(gè)消費(fèi)者處理,否則會(huì)導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi)。一個(gè)消費(fèi)者組的每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù)。
消費(fèi)者組的好處:加快消費(fèi)處理數(shù)據(jù)的速度,橫向提高整個(gè)消費(fèi)能力。如下圖,一開(kāi)始就一個(gè)消費(fèi)者c1,他要自己一個(gè)人消費(fèi)處理來(lái)自topicA主題的四個(gè)分區(qū)的數(shù)據(jù),而我們可以增加三個(gè)消費(fèi)者c2、c3、c4和c1構(gòu)成一個(gè)消費(fèi)者組來(lái)同時(shí)消費(fèi)處理topicA主題的四個(gè)分區(qū)的數(shù)據(jù),這樣消費(fèi)處理數(shù)據(jù)的速度就提升了。(前提是有多個(gè)分區(qū))
主題
上面那樣肯定不好,各種消息的的生產(chǎn)者(生產(chǎn)圓蛋蛋、生產(chǎn)方框框、生產(chǎn)小心心)將消息都發(fā)給kafka,然后kafka將消息都分類(lèi),每種分類(lèi)都有相應(yīng)的主題,然后消費(fèi)者根據(jù)需要訂閱相應(yīng)的主題。就能收到對(duì)應(yīng)的消息。
分區(qū)
如果一個(gè)主題的消息比較多,就可以考慮分區(qū),分區(qū)可以分布在不同的服務(wù)器上,所以主題也可以分布在不同的服務(wù)器上,這樣比單服務(wù)器處理快。
如果生成者沒(méi)有指定分區(qū),分區(qū)器就會(huì)根據(jù)每條消息的鍵算出消息該去哪個(gè)分區(qū)。鍵:就是每條消息的一個(gè)標(biāo)記,決定了消息該去哪個(gè)分區(qū)。分區(qū)器:就是一個(gè)算法,算消息該去哪個(gè)分區(qū),輸入是鍵,輸出是消息去的分區(qū)。
偏移量
偏移量就是消息在每個(gè)分區(qū)中的位置,kafka在收到消息的時(shí)候,會(huì)為每個(gè)消息設(shè)置偏移量,然后將消息存到磁盤(pán)中。
消費(fèi)者只能按順序消費(fèi)讀取。消費(fèi)者如果要分區(qū)0的第四個(gè),kafka就會(huì)說(shuō)第三個(gè)還沒(méi)讀取,不給第四個(gè)。
kafka集群
一個(gè)broker就是一個(gè)kafka服務(wù)器。下面有兩個(gè)broker構(gòu)成了kafka集群,他們的數(shù)據(jù)通過(guò)復(fù)制同步,當(dāng)有一個(gè)kafka宕機(jī)了,另一臺(tái)就可以先頂上,保證了kafka的可靠性。
監(jiān)控kafka
這個(gè)前提得先安裝jdk
1、修改kafka的啟動(dòng)腳本
vim bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
改為
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G
-XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200
-XX:ParallelGCThreads=8 -XX:ConcGCThreads=5
-XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
修改kafka進(jìn)程信息:
-Xms2G:設(shè)置 Kafka 進(jìn)程的初始堆內(nèi)存大小為 2 GB。
-Xmx2G:設(shè)置 Kafka 進(jìn)程的最大堆內(nèi)存大小為 2 GB。
XX:PermSize=128m:設(shè)置持久代(PermGen)的初始大小為 128 MB。請(qǐng)注意,這個(gè)選項(xiàng)在 Java 8 和更新的版本中不再適用,因?yàn)?PermGen 已被 Metaspace 取代。
-XX:+UseG1GC:指定使用 G1 垃圾收集器。
-XX:MaxGCPauseMillis=200:設(shè)置最大垃圾收集暫停時(shí)間為 200 毫秒。
XX:ParallelGCThreads=8:設(shè)置并行垃圾收集線(xiàn)程的數(shù)量為 8。
XX:ConcGCThreads=5:設(shè)置并發(fā)垃圾收集線(xiàn)程的數(shù)量為 5。
XX:InitiatingHeapOccupancyPercent=70:設(shè)置堆內(nèi)存占用百分比,當(dāng)堆內(nèi)存使用達(dá)到 70% 時(shí),啟動(dòng)并發(fā)垃圾收集。
這些參數(shù)的目的是調(diào)整 Kafka 進(jìn)程的性能和垃圾收集行為,以滿(mǎn)足特定的性能需求。請(qǐng)注意,這些參數(shù)的值可以根據(jù)你的 Kafka 部署和硬件資源進(jìn)行調(diào)整。堆內(nèi)存的大小和垃圾收集器的選擇將影響 Kafka 的性能和穩(wěn)定性。
最后,這段腳本還設(shè)置了 JMX 端口為 9999,這是用于監(jiān)控 Kafka 進(jìn)程的 Java Management Extensions(JMX)端口。通過(guò)此端口,你可以使用 JMX 工具監(jiān)控 Kafka 進(jìn)程的性能指標(biāo)和狀態(tài)。如果需要監(jiān)控 Kafka,你可以使用 JMX 工具連接到此端口。
2、官網(wǎng)下載安裝包
https://www.kafka-eagle.org/
3、上傳解壓
第一次解壓后,里面有個(gè)壓縮包再解壓才是真正的。
/opt/module/efak/conf/system-config.properties
5、配置環(huán)境變量
$ sudo vim /etc/profile.d/my_env.sh
# kafkaEFAK
export KE_HOME=/opt/module/efak
export PATH=$PATH:$KE_HOME/bin
source /etc/profile
6、啟動(dòng)
/bin/kf.sh start
壓力測(cè)試
# 單Kafka服務(wù)器,生成者發(fā)送1000000條數(shù)據(jù),每條大小1k,總共發(fā)送大約
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=linjl:9092 batch.size=16384 linger.ms=0
batch.size=16384 linger.ms=0 9.76 MB/sec
record-size 是一條信息有多大,單位是字節(jié),本次測(cè)試設(shè)置為 1k。
BUG
1、Error while fetching metadata with correlation id : {LEADER_NOT_AVAILABLE}
2、
[root@linjl kafka_2.12-3.0.0]# ./bin/kafka-console-consumer.sh --topic quickstart-events --bootstrap-server linjl:9092
[2023-09-13 16:51:54,710] WARN [Consumer clientId=consumer-console-consumer-32025-1, groupId=console-consumer-32025] Error while fetching metadata with correlation id 2 : {quickstart-events=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
這個(gè)警告消息 “Error while fetching metadata with correlation id 2 : {quickstart-events=LEADER_NOT_AVAILABLE}” 表示 Kafka 消費(fèi)者在嘗試訂閱主題 “quickstart-events” 時(shí)遇到了 “LEADER_NOT_AVAILABLE” 錯(cuò)誤。這個(gè)錯(cuò)誤通常表示消費(fèi)者無(wú)法找到主題的 leader 分區(qū),因此它無(wú)法讀取消息。
我的猜想: 可能是因?yàn)?Kafka 服務(wù)器無(wú)法從 ZooKeeper 獲取到有關(guān) “quickstart-events” 主題的元數(shù)據(jù)信息,包括分區(qū)的 Leader 信息。
3、Received invalid metadata error in produce request on partition quickstart-events-0
due to org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk… Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
表示在嘗試將消息寫(xiě)入分區(qū) “quickstart-events-0” 時(shí),Kafka 生產(chǎn)者遇到了磁盤(pán)錯(cuò)誤,無(wú)法訪(fǎng)問(wèn)日志文件。這個(gè)錯(cuò)誤通常與磁盤(pán)故障或磁盤(pán)空間不足有關(guān)。
4、Java客戶(hù)端創(chuàng)建生產(chǎn)者,發(fā)送消息給kafka沒(méi)響應(yīng)。
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.128:9092");
// key,value 序列化(必須):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
網(wǎng)絡(luò)連接都能通,而且防火墻也都關(guān)了。
解決:在server.properties配置文件中配置
# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.239.128:9092
kafka和flink結(jié)合案例
數(shù)據(jù)寫(xiě)入kafka,flink訂閱消費(fèi)
安裝kafka單服務(wù)
1、官方下載地址:http://kafka.apache.org/downloads.html
2、解壓安裝包
下載完將安裝包上傳到centos中,然后解壓
$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
3、 修改解壓后的文件名稱(chēng)
$ mv kafka_2.12-3.0.0/ kafka
4、進(jìn)入到/opt/module/kafka 目錄,修改配置文件
$ cd config/
$ vim server.properties
#broker 的全局唯一編號(hào),不能重復(fù),只能是數(shù)字。
broker.id=0
#處理網(wǎng)絡(luò)請(qǐng)求的線(xiàn)程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤(pán) IO 的線(xiàn)程數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka 運(yùn)行日志(數(shù)據(jù))存放的路徑,路徑不需要提前創(chuàng)建,kafka 自動(dòng)幫你創(chuàng)建,可以配置多個(gè)磁盤(pán)路徑,路徑與路徑之間可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic 在當(dāng)前 broker 上的分區(qū)個(gè)數(shù)
num.partitions=1
#用來(lái)恢復(fù)和清理 data 下數(shù)據(jù)的線(xiàn)程數(shù)量
num.recovery.threads.per.data.dir=1
# 每個(gè) topic 創(chuàng)建時(shí)的副本數(shù),默認(rèn)時(shí) 1 個(gè)副本
offsets.topic.replication.factor=1
#segment 文件保留的最長(zhǎng)時(shí)間,超時(shí)將被刪除
log.retention.hours=168
#每個(gè) segment 文件的大小,默認(rèn)最大 1G
log.segment.bytes=1073741824
# 檢查過(guò)期數(shù)據(jù)的時(shí)間,默認(rèn) 5 分鐘檢查一次是否數(shù)據(jù)過(guò)期
log.retention.check.interval.ms=300000
#配置連接Zookeeper 集群地址(在 zk 根目錄下創(chuàng)建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
5、配置kafka環(huán)境變量
vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新
$ source /etc/profile
6、啟動(dòng)kafka
./kafka/bin/kafka-server-start.sh
創(chuàng)建生產(chǎn)者,將數(shù)據(jù)寫(xiě)入kafka
柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記
文章鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。