欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記

柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記

http://yzkb.51969.com/

Kafka

Kafka 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(Message Queue),主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。

Kafka可以用作Flink應(yīng)用程序的數(shù)據(jù)源。Flink可以輕松地從一個(gè)或多個(gè)Kafka主題中消費(fèi)數(shù)據(jù)流。這意味著您可以使用Kafka來(lái)捕獲和傳輸實(shí)時(shí)數(shù)據(jù),并將其發(fā)送到Flink進(jìn)行進(jìn)一步處理。

Flink和Kafka在實(shí)時(shí)數(shù)據(jù)處理和流處理應(yīng)用程序中通常協(xié)同工作,Kafka用于數(shù)據(jù)傳輸和捕獲,而Flink用于數(shù)據(jù)處理和分析。

Kafka由 生產(chǎn)者 Broker 消費(fèi)者組成,生產(chǎn)者和消費(fèi)者是由Java語(yǔ)言編寫(xiě)的,Broker由Scala語(yǔ)言寫(xiě)的。

基礎(chǔ)架構(gòu)

Producer:kafka 生產(chǎn)者,用于接收外部數(shù)據(jù),然后將數(shù)據(jù)發(fā)送給kafka集群存儲(chǔ),假如要發(fā)100T的數(shù)據(jù)。Consumer:消費(fèi)者,就是到kafka中取數(shù)據(jù)的客戶(hù)端,比如:flink就是一個(gè)消費(fèi)者,到kafka中取出數(shù)據(jù)計(jì)算處理。Broker:一個(gè)Broker就是一個(gè)kafka服務(wù)器,如果你在一個(gè)虛擬機(jī)上安裝了kafka,那么這個(gè)虛擬機(jī)就是一個(gè)Broker。Partition:分區(qū)。前面說(shuō)了,要發(fā)送100T的數(shù)據(jù)給kafka,那如果只用一臺(tái)kafka服務(wù)器(Broker)接收肯定不好,太大了。所以就有了kafka集群一起處理,這100T的數(shù)據(jù)是一個(gè)主題,太大了,就考慮分區(qū),分成3個(gè)區(qū),每個(gè)分區(qū)分到不同的kafka服務(wù)器上,一個(gè)分區(qū)存33T。Consumer Group:消費(fèi)者組。由多個(gè)消費(fèi)者組成,消費(fèi)者就是來(lái)取kafka中的數(shù)據(jù)來(lái)處理使用的?,F(xiàn)在kafka已經(jīng)存儲(chǔ)了100T的數(shù)據(jù),假如一個(gè)消費(fèi)者來(lái)取使用,肯定比較慢,所以就可以引入多個(gè)消費(fèi)者一起來(lái)取數(shù)據(jù)處理。一個(gè)分區(qū)中的數(shù)據(jù)只能由一個(gè)消費(fèi)者Replica:副本。每個(gè)分區(qū)可以設(shè)置一個(gè)或多個(gè)副本,我理解是副本會(huì)同步主分區(qū)中的數(shù)據(jù),假如主分區(qū)掛了,副本就可以頂上去了。Leader:領(lǐng)導(dǎo)。主分區(qū),所有副本分區(qū)中的主分區(qū),生產(chǎn)者和消費(fèi)者都只操作主分區(qū)。Follower:除了主分區(qū),其他的副本分區(qū)都是Follower,F(xiàn)ollower會(huì)從Leader中同步數(shù)據(jù),當(dāng)Leader掛了,某個(gè)Follower會(huì)成為新的Leader。zookeeeper: ZooKeeper 用于協(xié)調(diào)和管理 Kafka 集群的各個(gè)組件,包括 Broker、Topic 配置、分區(qū)分配、Leader 選舉等。Kafka 使用 ZooKeeper 來(lái)維護(hù)集群的整體狀態(tài)和配置信息,以確保各個(gè)組件之間的協(xié)同工作。

生產(chǎn)者

在消息發(fā)送的過(guò)程中,涉及到了兩個(gè)線(xiàn)程——main?線(xiàn)程和?Sender?線(xiàn)程。在 main 線(xiàn)程中創(chuàng)建了一個(gè)雙端隊(duì)列?RecordAccumulator。main 線(xiàn)程將消息發(fā)送給 RecordAccumulator, Sender 線(xiàn)程不斷從RecordAccumulator 中拉取消息發(fā)送到Kafka Broker。

生產(chǎn)者的main線(xiàn)程

main線(xiàn)程先創(chuàng)建Producer對(duì)象,然后調(diào)用send方法,數(shù)據(jù)會(huì)經(jīng)過(guò)攔截器,進(jìn)行過(guò)濾處理,如果不需要可以不設(shè)置攔截器,攔截器用的較少。接著經(jīng)過(guò)序列化器對(duì)數(shù)據(jù)進(jìn)行序列化(在網(wǎng)絡(luò)中傳輸數(shù)據(jù)需要序列化將數(shù)據(jù)轉(zhuǎn)成通用的字節(jié)流便于網(wǎng)絡(luò)傳輸),然后經(jīng)過(guò)分區(qū)器,分區(qū)器決定每條數(shù)據(jù)要發(fā)往哪個(gè)分區(qū),然后將每條數(shù)據(jù)發(fā)給對(duì)應(yīng)的分區(qū),一個(gè)分區(qū)對(duì)應(yīng)一個(gè)DQuee(雙端隊(duì)列),隊(duì)列中會(huì)有一批一批數(shù)據(jù),一批數(shù)據(jù)默認(rèn)大小是16k。

總的來(lái)說(shuō),main線(xiàn)程將數(shù)據(jù)發(fā)到RecordAccumulator記錄累加器中,默認(rèn)大小是32m,這個(gè)是在內(nèi)存中,起到緩存的作用,將大量的數(shù)據(jù)一批一批發(fā)給kafka,提高網(wǎng)絡(luò)傳輸速率。累加器使用有限的內(nèi)存,當(dāng)內(nèi)存耗盡時(shí)(生成者產(chǎn)生數(shù)據(jù)的速度超過(guò)發(fā)送給服務(wù)器的速度),追加調(diào)用將阻塞,除非顯式禁用此行為。

sender線(xiàn)程,負(fù)責(zé)將數(shù)據(jù)發(fā)給kafka。數(shù)據(jù)是分批次發(fā)給kafka,當(dāng)一個(gè)批次的數(shù)據(jù)達(dá)到16k或等待的時(shí)間達(dá)到linger.ms設(shè)置的時(shí)間,一個(gè)批次的數(shù)據(jù)就會(huì)被sender發(fā)給kafka,一個(gè)批次就是分區(qū)隊(duì)列中那個(gè)小正方形。

sender發(fā)送數(shù)據(jù):broker1(request1,request2,request3,request4,request5),每個(gè)kafka節(jié)點(diǎn)維護(hù)一個(gè)發(fā)送數(shù)據(jù)的請(qǐng)求緩存,這個(gè)請(qǐng)求緩存最多緩存5個(gè)請(qǐng)求,如果請(qǐng)求發(fā)送失敗了,會(huì)使用后面的請(qǐng)求繼續(xù)發(fā)。批數(shù)據(jù)到達(dá)對(duì)應(yīng)的broker后,會(huì)先同步副本。

生產(chǎn)者分區(qū)

分區(qū)好處:

1)便于合理使用存儲(chǔ)資源,每個(gè)Partition在一個(gè)Broker上存儲(chǔ),可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)存儲(chǔ)在多臺(tái)Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果。

2)提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù),從而提高消費(fèi)處理數(shù)據(jù)的速度。

生產(chǎn)者發(fā)送消息的分區(qū)策略

ProducerRecord是生產(chǎn)者發(fā)送數(shù)據(jù)的單位

自定義分區(qū)器

也可以自定義分區(qū)器,自己決定數(shù)據(jù)要發(fā)到哪個(gè)分區(qū)中

import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;

import java.util.Map;

/**

發(fā)送過(guò)來(lái)的數(shù)據(jù)中如果包含 atguigu,就發(fā)往 0 號(hào)分區(qū),不包含 atguigu,就發(fā)往 1 號(hào)分區(qū)

定義類(lèi)實(shí)現(xiàn) Partitioner 接口,重寫(xiě) partition()方法。

* 1. 實(shí)現(xiàn)接口 Partitioner

* 2. 實(shí)現(xiàn) 3 個(gè)方法:partition,close,configure

* 3. 編寫(xiě) partition 方法,返回分區(qū)號(hào)

*/

public class MyPartitioner implements Partitioner {

/**

* 返回信息對(duì)應(yīng)的分區(qū)

* @param topic 主題

* @param key 消息的 key

* @param keyBytes 消息的 key 序列化后的字節(jié)數(shù)組

* @param value 消息的 value

* @param valueBytes 消息的 value 序列化后的字節(jié)數(shù)組

* @param cluster 集群元數(shù)據(jù)可以查看分區(qū)信息

* @return

*/

@Override

public int partition(String topic, Object key, byte[]

keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

// 獲取消息

String msgValue = value.toString();

// 創(chuàng)建 partition

int partition;

// 判斷消息是否包含 atguigu

if (msgValue.contains("atguigu")){

partition = 0;

}else {

partition = 1;

}

// 返回分區(qū)號(hào)

return partition;

}

// 關(guān)閉資源

@Override

public void close() {

}

// 配置方法

@Override

public void configure(Map configs) {

}

}

然后在生產(chǎn)者配置里加上自定義分區(qū)器

// 添加自定義分區(qū)器

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");

生產(chǎn)者如何提高吞吐量

如何提高生產(chǎn)者發(fā)送數(shù)據(jù)的速度,主要是調(diào)整以下四個(gè)參數(shù)

? batch.size:批次大小,默認(rèn)16k

? linger.ms:等待時(shí)間,修改為5-100ms

默認(rèn)是0ms,就是數(shù)據(jù)一到隊(duì)列中就發(fā)給broker,這樣的好處就是實(shí)時(shí)性好,但是效率低,一次發(fā)幾條數(shù)據(jù)總比一次發(fā)一條效率高。也不能改太大,太大時(shí)效性不好。

? compression.type:壓縮snappy

壓縮數(shù)據(jù),這樣一批次就可以存更多的數(shù)據(jù)

? RecordAccumulator:緩沖區(qū)大小,可修改為64m

生產(chǎn)者數(shù)據(jù)可靠性

主要是當(dāng)broker收到數(shù)據(jù)后的應(yīng)答機(jī)制

ISR隊(duì)列是只一個(gè)分區(qū)的Leader和所有的Followers的集合,ISR(0,1,2),為了解決那個(gè)問(wèn)題,如果Leader長(zhǎng)時(shí)間沒(méi)收到某個(gè)Follower同步數(shù)據(jù)的請(qǐng)求,就會(huì)認(rèn)為這個(gè)Follower故障了,就會(huì)從ISR隊(duì)列中踢出這個(gè)Follower,ISR(0,1)。

如果Follower長(zhǎng)時(shí)間未向Leader發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則 該Follower將被踢出ISR。該時(shí)間閾值由replica.lag.time.max.ms參 數(shù)設(shè)定,默認(rèn)30s。例如2超時(shí),(leader:0, isr:0,1)。

數(shù)據(jù)完全可靠條件 = ACK級(jí)別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2

數(shù)據(jù)可靠性越強(qiáng),效率越慢

// 設(shè)置 acks

properties.put(ProducerConfig.ACKS_CONFIG, "all");

// 重試次數(shù) retries,默認(rèn)是 int 最大值,2147483647

properties.put(ProducerConfig.RETRIES_CONFIG, 3);

消費(fèi)者

消費(fèi)者消費(fèi)一個(gè)分區(qū)中的數(shù)據(jù)時(shí),會(huì)跟蹤他們自己消費(fèi)到的偏移量,Kafka 會(huì)定期將偏移量提交到 Kafka 主題中的特殊主題(__consumer_offsets)中,這樣,消費(fèi)者如果停止或重新啟動(dòng)后,會(huì)從上次的偏移量繼續(xù)消費(fèi)。偏移量是每條消息在分區(qū)中的位置。

消費(fèi)者組

由多個(gè)kafka消費(fèi)者組成的一組消費(fèi)者組,用于同時(shí)消費(fèi)處理kafka中一個(gè)主題所有分區(qū)中的數(shù)據(jù)。只要在創(chuàng)建kafka消費(fèi)者的時(shí)候?qū)roup id設(shè)置成一樣的,那么就可以創(chuàng)建多個(gè)消費(fèi)者構(gòu)成消費(fèi)者組了。

一個(gè)主題的一個(gè)分區(qū)只能由一個(gè)消費(fèi)者組內(nèi)的一個(gè)消費(fèi)者處理,否則會(huì)導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi)。一個(gè)消費(fèi)者組的每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù)。

消費(fèi)者組的好處:加快消費(fèi)處理數(shù)據(jù)的速度,橫向提高整個(gè)消費(fèi)能力。如下圖,一開(kāi)始就一個(gè)消費(fèi)者c1,他要自己一個(gè)人消費(fèi)處理來(lái)自topicA主題的四個(gè)分區(qū)的數(shù)據(jù),而我們可以增加三個(gè)消費(fèi)者c2、c3、c4和c1構(gòu)成一個(gè)消費(fèi)者組來(lái)同時(shí)消費(fèi)處理topicA主題的四個(gè)分區(qū)的數(shù)據(jù),這樣消費(fèi)處理數(shù)據(jù)的速度就提升了。(前提是有多個(gè)分區(qū))

主題

上面那樣肯定不好,各種消息的的生產(chǎn)者(生產(chǎn)圓蛋蛋、生產(chǎn)方框框、生產(chǎn)小心心)將消息都發(fā)給kafka,然后kafka將消息都分類(lèi),每種分類(lèi)都有相應(yīng)的主題,然后消費(fèi)者根據(jù)需要訂閱相應(yīng)的主題。就能收到對(duì)應(yīng)的消息。

分區(qū)

如果一個(gè)主題的消息比較多,就可以考慮分區(qū),分區(qū)可以分布在不同的服務(wù)器上,所以主題也可以分布在不同的服務(wù)器上,這樣比單服務(wù)器處理快。

如果生成者沒(méi)有指定分區(qū),分區(qū)器就會(huì)根據(jù)每條消息的鍵算出消息該去哪個(gè)分區(qū)。鍵:就是每條消息的一個(gè)標(biāo)記,決定了消息該去哪個(gè)分區(qū)。分區(qū)器:就是一個(gè)算法,算消息該去哪個(gè)分區(qū),輸入是鍵,輸出是消息去的分區(qū)。

偏移量

偏移量就是消息在每個(gè)分區(qū)中的位置,kafka在收到消息的時(shí)候,會(huì)為每個(gè)消息設(shè)置偏移量,然后將消息存到磁盤(pán)中。

消費(fèi)者只能按順序消費(fèi)讀取。消費(fèi)者如果要分區(qū)0的第四個(gè),kafka就會(huì)說(shuō)第三個(gè)還沒(méi)讀取,不給第四個(gè)。

kafka集群

一個(gè)broker就是一個(gè)kafka服務(wù)器。下面有兩個(gè)broker構(gòu)成了kafka集群,他們的數(shù)據(jù)通過(guò)復(fù)制同步,當(dāng)有一個(gè)kafka宕機(jī)了,另一臺(tái)就可以先頂上,保證了kafka的可靠性。

監(jiān)控kafka

這個(gè)前提得先安裝jdk

1、修改kafka的啟動(dòng)腳本

vim bin/kafka-server-start.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

fi

改為

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G

-XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200

-XX:ParallelGCThreads=8 -XX:ConcGCThreads=5

-XX:InitiatingHeapOccupancyPercent=70"

export JMX_PORT="9999"

#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

fi

修改kafka進(jìn)程信息:

-Xms2G:設(shè)置 Kafka 進(jìn)程的初始堆內(nèi)存大小為 2 GB。

-Xmx2G:設(shè)置 Kafka 進(jìn)程的最大堆內(nèi)存大小為 2 GB。

XX:PermSize=128m:設(shè)置持久代(PermGen)的初始大小為 128 MB。請(qǐng)注意,這個(gè)選項(xiàng)在 Java 8 和更新的版本中不再適用,因?yàn)?PermGen 已被 Metaspace 取代。

-XX:+UseG1GC:指定使用 G1 垃圾收集器。

-XX:MaxGCPauseMillis=200:設(shè)置最大垃圾收集暫停時(shí)間為 200 毫秒。

XX:ParallelGCThreads=8:設(shè)置并行垃圾收集線(xiàn)程的數(shù)量為 8。

XX:ConcGCThreads=5:設(shè)置并發(fā)垃圾收集線(xiàn)程的數(shù)量為 5。

XX:InitiatingHeapOccupancyPercent=70:設(shè)置堆內(nèi)存占用百分比,當(dāng)堆內(nèi)存使用達(dá)到 70% 時(shí),啟動(dòng)并發(fā)垃圾收集。

這些參數(shù)的目的是調(diào)整 Kafka 進(jìn)程的性能和垃圾收集行為,以滿(mǎn)足特定的性能需求。請(qǐng)注意,這些參數(shù)的值可以根據(jù)你的 Kafka 部署和硬件資源進(jìn)行調(diào)整。堆內(nèi)存的大小和垃圾收集器的選擇將影響 Kafka 的性能和穩(wěn)定性。

最后,這段腳本還設(shè)置了 JMX 端口為 9999,這是用于監(jiān)控 Kafka 進(jìn)程的 Java Management Extensions(JMX)端口。通過(guò)此端口,你可以使用 JMX 工具監(jiān)控 Kafka 進(jìn)程的性能指標(biāo)和狀態(tài)。如果需要監(jiān)控 Kafka,你可以使用 JMX 工具連接到此端口。

2、官網(wǎng)下載安裝包

https://www.kafka-eagle.org/

3、上傳解壓

第一次解壓后,里面有個(gè)壓縮包再解壓才是真正的。

/opt/module/efak/conf/system-config.properties

5、配置環(huán)境變量

$ sudo vim /etc/profile.d/my_env.sh

# kafkaEFAK

export KE_HOME=/opt/module/efak

export PATH=$PATH:$KE_HOME/bin

source /etc/profile

6、啟動(dòng)

/bin/kf.sh start

壓力測(cè)試

# 單Kafka服務(wù)器,生成者發(fā)送1000000條數(shù)據(jù),每條大小1k,總共發(fā)送大約

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=linjl:9092 batch.size=16384 linger.ms=0

batch.size=16384 linger.ms=0 9.76 MB/sec

record-size 是一條信息有多大,單位是字節(jié),本次測(cè)試設(shè)置為 1k。

BUG

1、Error while fetching metadata with correlation id : {LEADER_NOT_AVAILABLE}

2、

[root@linjl kafka_2.12-3.0.0]# ./bin/kafka-console-consumer.sh --topic quickstart-events --bootstrap-server linjl:9092

[2023-09-13 16:51:54,710] WARN [Consumer clientId=consumer-console-consumer-32025-1, groupId=console-consumer-32025] Error while fetching metadata with correlation id 2 : {quickstart-events=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

這個(gè)警告消息 “Error while fetching metadata with correlation id 2 : {quickstart-events=LEADER_NOT_AVAILABLE}” 表示 Kafka 消費(fèi)者在嘗試訂閱主題 “quickstart-events” 時(shí)遇到了 “LEADER_NOT_AVAILABLE” 錯(cuò)誤。這個(gè)錯(cuò)誤通常表示消費(fèi)者無(wú)法找到主題的 leader 分區(qū),因此它無(wú)法讀取消息。

我的猜想: 可能是因?yàn)?Kafka 服務(wù)器無(wú)法從 ZooKeeper 獲取到有關(guān) “quickstart-events” 主題的元數(shù)據(jù)信息,包括分區(qū)的 Leader 信息。

3、Received invalid metadata error in produce request on partition quickstart-events-0

due to org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk… Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)

表示在嘗試將消息寫(xiě)入分區(qū) “quickstart-events-0” 時(shí),Kafka 生產(chǎn)者遇到了磁盤(pán)錯(cuò)誤,無(wú)法訪(fǎng)問(wèn)日志文件。這個(gè)錯(cuò)誤通常與磁盤(pán)故障或磁盤(pán)空間不足有關(guān)。

4、Java客戶(hù)端創(chuàng)建生產(chǎn)者,發(fā)送消息給kafka沒(méi)響應(yīng)。

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.128:9092");

// key,value 序列化(必須):key.serializer,value.serializer

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())

網(wǎng)絡(luò)連接都能通,而且防火墻也都關(guān)了。

解決:在server.properties配置文件中配置

# The address the socket server listens on. If not configured, the host name will be equal to the value of

# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.

# FORMAT:

# listeners = listener_name://host_name:port

# EXAMPLE:

# listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://192.168.239.128:9092

kafka和flink結(jié)合案例

數(shù)據(jù)寫(xiě)入kafka,flink訂閱消費(fèi)

安裝kafka單服務(wù)

1、官方下載地址:http://kafka.apache.org/downloads.html

2、解壓安裝包

下載完將安裝包上傳到centos中,然后解壓

$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

3、 修改解壓后的文件名稱(chēng)

$ mv kafka_2.12-3.0.0/ kafka

4、進(jìn)入到/opt/module/kafka 目錄,修改配置文件

$ cd config/

$ vim server.properties

#broker 的全局唯一編號(hào),不能重復(fù),只能是數(shù)字。

broker.id=0

#處理網(wǎng)絡(luò)請(qǐng)求的線(xiàn)程數(shù)量

num.network.threads=3

#用來(lái)處理磁盤(pán) IO 的線(xiàn)程數(shù)量

num.io.threads=8

#發(fā)送套接字的緩沖區(qū)大小

socket.send.buffer.bytes=102400

#接收套接字的緩沖區(qū)大小

socket.receive.buffer.bytes=102400

#請(qǐng)求套接字的緩沖區(qū)大小

socket.request.max.bytes=104857600

#kafka 運(yùn)行日志(數(shù)據(jù))存放的路徑,路徑不需要提前創(chuàng)建,kafka 自動(dòng)幫你創(chuàng)建,可以配置多個(gè)磁盤(pán)路徑,路徑與路徑之間可以用","分隔

log.dirs=/opt/module/kafka/datas

#topic 在當(dāng)前 broker 上的分區(qū)個(gè)數(shù)

num.partitions=1

#用來(lái)恢復(fù)和清理 data 下數(shù)據(jù)的線(xiàn)程數(shù)量

num.recovery.threads.per.data.dir=1

# 每個(gè) topic 創(chuàng)建時(shí)的副本數(shù),默認(rèn)時(shí) 1 個(gè)副本

offsets.topic.replication.factor=1

#segment 文件保留的最長(zhǎng)時(shí)間,超時(shí)將被刪除

log.retention.hours=168

#每個(gè) segment 文件的大小,默認(rèn)最大 1G

log.segment.bytes=1073741824

# 檢查過(guò)期數(shù)據(jù)的時(shí)間,默認(rèn) 5 分鐘檢查一次是否數(shù)據(jù)過(guò)期

log.retention.check.interval.ms=300000

#配置連接Zookeeper 集群地址(在 zk 根目錄下創(chuàng)建/kafka,方便管理)

zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

5、配置kafka環(huán)境變量

vim /etc/profile.d/my_env.sh

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$KAFKA_HOME/bin

刷新

$ source /etc/profile

6、啟動(dòng)kafka

./kafka/bin/kafka-server-start.sh

創(chuàng)建生產(chǎn)者,將數(shù)據(jù)寫(xiě)入kafka

柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記

http://yzkb.51969.com/

文章鏈接

評(píng)論可見(jiàn),查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18494232.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪(fǎng)問(wèn)

文章目錄