欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記

柚子快報邀請碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記

http://yzkb.51969.com/

Kafka

Kafka 是一個分布式的基于發(fā)布/訂閱模式的消息隊列(Message Queue),主要應(yīng)用于大數(shù)據(jù)實時處理領(lǐng)域。

Kafka可以用作Flink應(yīng)用程序的數(shù)據(jù)源。Flink可以輕松地從一個或多個Kafka主題中消費數(shù)據(jù)流。這意味著您可以使用Kafka來捕獲和傳輸實時數(shù)據(jù),并將其發(fā)送到Flink進行進一步處理。

Flink和Kafka在實時數(shù)據(jù)處理和流處理應(yīng)用程序中通常協(xié)同工作,Kafka用于數(shù)據(jù)傳輸和捕獲,而Flink用于數(shù)據(jù)處理和分析。

Kafka由 生產(chǎn)者 Broker 消費者組成,生產(chǎn)者和消費者是由Java語言編寫的,Broker由Scala語言寫的。

基礎(chǔ)架構(gòu)

Producer:kafka 生產(chǎn)者,用于接收外部數(shù)據(jù),然后將數(shù)據(jù)發(fā)送給kafka集群存儲,假如要發(fā)100T的數(shù)據(jù)。Consumer:消費者,就是到kafka中取數(shù)據(jù)的客戶端,比如:flink就是一個消費者,到kafka中取出數(shù)據(jù)計算處理。Broker:一個Broker就是一個kafka服務(wù)器,如果你在一個虛擬機上安裝了kafka,那么這個虛擬機就是一個Broker。Partition:分區(qū)。前面說了,要發(fā)送100T的數(shù)據(jù)給kafka,那如果只用一臺kafka服務(wù)器(Broker)接收肯定不好,太大了。所以就有了kafka集群一起處理,這100T的數(shù)據(jù)是一個主題,太大了,就考慮分區(qū),分成3個區(qū),每個分區(qū)分到不同的kafka服務(wù)器上,一個分區(qū)存33T。Consumer Group:消費者組。由多個消費者組成,消費者就是來取kafka中的數(shù)據(jù)來處理使用的?,F(xiàn)在kafka已經(jīng)存儲了100T的數(shù)據(jù),假如一個消費者來取使用,肯定比較慢,所以就可以引入多個消費者一起來取數(shù)據(jù)處理。一個分區(qū)中的數(shù)據(jù)只能由一個消費者Replica:副本。每個分區(qū)可以設(shè)置一個或多個副本,我理解是副本會同步主分區(qū)中的數(shù)據(jù),假如主分區(qū)掛了,副本就可以頂上去了。Leader:領(lǐng)導(dǎo)。主分區(qū),所有副本分區(qū)中的主分區(qū),生產(chǎn)者和消費者都只操作主分區(qū)。Follower:除了主分區(qū),其他的副本分區(qū)都是Follower,F(xiàn)ollower會從Leader中同步數(shù)據(jù),當Leader掛了,某個Follower會成為新的Leader。zookeeeper: ZooKeeper 用于協(xié)調(diào)和管理 Kafka 集群的各個組件,包括 Broker、Topic 配置、分區(qū)分配、Leader 選舉等。Kafka 使用 ZooKeeper 來維護集群的整體狀態(tài)和配置信息,以確保各個組件之間的協(xié)同工作。

生產(chǎn)者

在消息發(fā)送的過程中,涉及到了兩個線程——main?線程和?Sender?線程。在 main 線程中創(chuàng)建了一個雙端隊列?RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator, Sender 線程不斷從RecordAccumulator 中拉取消息發(fā)送到Kafka Broker。

生產(chǎn)者的main線程

main線程先創(chuàng)建Producer對象,然后調(diào)用send方法,數(shù)據(jù)會經(jīng)過攔截器,進行過濾處理,如果不需要可以不設(shè)置攔截器,攔截器用的較少。接著經(jīng)過序列化器對數(shù)據(jù)進行序列化(在網(wǎng)絡(luò)中傳輸數(shù)據(jù)需要序列化將數(shù)據(jù)轉(zhuǎn)成通用的字節(jié)流便于網(wǎng)絡(luò)傳輸),然后經(jīng)過分區(qū)器,分區(qū)器決定每條數(shù)據(jù)要發(fā)往哪個分區(qū),然后將每條數(shù)據(jù)發(fā)給對應(yīng)的分區(qū),一個分區(qū)對應(yīng)一個DQuee(雙端隊列),隊列中會有一批一批數(shù)據(jù),一批數(shù)據(jù)默認大小是16k。

總的來說,main線程將數(shù)據(jù)發(fā)到RecordAccumulator記錄累加器中,默認大小是32m,這個是在內(nèi)存中,起到緩存的作用,將大量的數(shù)據(jù)一批一批發(fā)給kafka,提高網(wǎng)絡(luò)傳輸速率。累加器使用有限的內(nèi)存,當內(nèi)存耗盡時(生成者產(chǎn)生數(shù)據(jù)的速度超過發(fā)送給服務(wù)器的速度),追加調(diào)用將阻塞,除非顯式禁用此行為。

sender線程,負責(zé)將數(shù)據(jù)發(fā)給kafka。數(shù)據(jù)是分批次發(fā)給kafka,當一個批次的數(shù)據(jù)達到16k或等待的時間達到linger.ms設(shè)置的時間,一個批次的數(shù)據(jù)就會被sender發(fā)給kafka,一個批次就是分區(qū)隊列中那個小正方形。

sender發(fā)送數(shù)據(jù):broker1(request1,request2,request3,request4,request5),每個kafka節(jié)點維護一個發(fā)送數(shù)據(jù)的請求緩存,這個請求緩存最多緩存5個請求,如果請求發(fā)送失敗了,會使用后面的請求繼續(xù)發(fā)。批數(shù)據(jù)到達對應(yīng)的broker后,會先同步副本。

生產(chǎn)者分區(qū)

分區(qū)好處:

1)便于合理使用存儲資源,每個Partition在一個Broker上存儲,可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)存儲在多臺Broker上。合理控制分區(qū)的任務(wù),可以實現(xiàn)負載均衡的效果。

2)提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費者可以以分區(qū)為單位進行消費數(shù)據(jù),從而提高消費處理數(shù)據(jù)的速度。

生產(chǎn)者發(fā)送消息的分區(qū)策略

ProducerRecord是生產(chǎn)者發(fā)送數(shù)據(jù)的單位

自定義分區(qū)器

也可以自定義分區(qū)器,自己決定數(shù)據(jù)要發(fā)到哪個分區(qū)中

import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;

import java.util.Map;

/**

發(fā)送過來的數(shù)據(jù)中如果包含 atguigu,就發(fā)往 0 號分區(qū),不包含 atguigu,就發(fā)往 1 號分區(qū)

定義類實現(xiàn) Partitioner 接口,重寫 partition()方法。

* 1. 實現(xiàn)接口 Partitioner

* 2. 實現(xiàn) 3 個方法:partition,close,configure

* 3. 編寫 partition 方法,返回分區(qū)號

*/

public class MyPartitioner implements Partitioner {

/**

* 返回信息對應(yīng)的分區(qū)

* @param topic 主題

* @param key 消息的 key

* @param keyBytes 消息的 key 序列化后的字節(jié)數(shù)組

* @param value 消息的 value

* @param valueBytes 消息的 value 序列化后的字節(jié)數(shù)組

* @param cluster 集群元數(shù)據(jù)可以查看分區(qū)信息

* @return

*/

@Override

public int partition(String topic, Object key, byte[]

keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

// 獲取消息

String msgValue = value.toString();

// 創(chuàng)建 partition

int partition;

// 判斷消息是否包含 atguigu

if (msgValue.contains("atguigu")){

partition = 0;

}else {

partition = 1;

}

// 返回分區(qū)號

return partition;

}

// 關(guān)閉資源

@Override

public void close() {

}

// 配置方法

@Override

public void configure(Map configs) {

}

}

然后在生產(chǎn)者配置里加上自定義分區(qū)器

// 添加自定義分區(qū)器

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");

生產(chǎn)者如何提高吞吐量

如何提高生產(chǎn)者發(fā)送數(shù)據(jù)的速度,主要是調(diào)整以下四個參數(shù)

? batch.size:批次大小,默認16k

? linger.ms:等待時間,修改為5-100ms

默認是0ms,就是數(shù)據(jù)一到隊列中就發(fā)給broker,這樣的好處就是實時性好,但是效率低,一次發(fā)幾條數(shù)據(jù)總比一次發(fā)一條效率高。也不能改太大,太大時效性不好。

? compression.type:壓縮snappy

壓縮數(shù)據(jù),這樣一批次就可以存更多的數(shù)據(jù)

? RecordAccumulator:緩沖區(qū)大小,可修改為64m

生產(chǎn)者數(shù)據(jù)可靠性

主要是當broker收到數(shù)據(jù)后的應(yīng)答機制

ISR隊列是只一個分區(qū)的Leader和所有的Followers的集合,ISR(0,1,2),為了解決那個問題,如果Leader長時間沒收到某個Follower同步數(shù)據(jù)的請求,就會認為這個Follower故障了,就會從ISR隊列中踢出這個Follower,ISR(0,1)。

如果Follower長時間未向Leader發(fā)送通信請求或同步數(shù)據(jù),則 該Follower將被踢出ISR。該時間閾值由replica.lag.time.max.ms參 數(shù)設(shè)定,默認30s。例如2超時,(leader:0, isr:0,1)。

數(shù)據(jù)完全可靠條件 = ACK級別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2

數(shù)據(jù)可靠性越強,效率越慢

// 設(shè)置 acks

properties.put(ProducerConfig.ACKS_CONFIG, "all");

// 重試次數(shù) retries,默認是 int 最大值,2147483647

properties.put(ProducerConfig.RETRIES_CONFIG, 3);

消費者

消費者消費一個分區(qū)中的數(shù)據(jù)時,會跟蹤他們自己消費到的偏移量,Kafka 會定期將偏移量提交到 Kafka 主題中的特殊主題(__consumer_offsets)中,這樣,消費者如果停止或重新啟動后,會從上次的偏移量繼續(xù)消費。偏移量是每條消息在分區(qū)中的位置。

消費者組

由多個kafka消費者組成的一組消費者組,用于同時消費處理kafka中一個主題所有分區(qū)中的數(shù)據(jù)。只要在創(chuàng)建kafka消費者的時候?qū)roup id設(shè)置成一樣的,那么就可以創(chuàng)建多個消費者構(gòu)成消費者組了。

一個主題的一個分區(qū)只能由一個消費者組內(nèi)的一個消費者處理,否則會導(dǎo)致數(shù)據(jù)重復(fù)消費。一個消費者組的每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù)。

消費者組的好處:加快消費處理數(shù)據(jù)的速度,橫向提高整個消費能力。如下圖,一開始就一個消費者c1,他要自己一個人消費處理來自topicA主題的四個分區(qū)的數(shù)據(jù),而我們可以增加三個消費者c2、c3、c4和c1構(gòu)成一個消費者組來同時消費處理topicA主題的四個分區(qū)的數(shù)據(jù),這樣消費處理數(shù)據(jù)的速度就提升了。(前提是有多個分區(qū))

主題

上面那樣肯定不好,各種消息的的生產(chǎn)者(生產(chǎn)圓蛋蛋、生產(chǎn)方框框、生產(chǎn)小心心)將消息都發(fā)給kafka,然后kafka將消息都分類,每種分類都有相應(yīng)的主題,然后消費者根據(jù)需要訂閱相應(yīng)的主題。就能收到對應(yīng)的消息。

分區(qū)

如果一個主題的消息比較多,就可以考慮分區(qū),分區(qū)可以分布在不同的服務(wù)器上,所以主題也可以分布在不同的服務(wù)器上,這樣比單服務(wù)器處理快。

如果生成者沒有指定分區(qū),分區(qū)器就會根據(jù)每條消息的鍵算出消息該去哪個分區(qū)。鍵:就是每條消息的一個標記,決定了消息該去哪個分區(qū)。分區(qū)器:就是一個算法,算消息該去哪個分區(qū),輸入是鍵,輸出是消息去的分區(qū)。

偏移量

偏移量就是消息在每個分區(qū)中的位置,kafka在收到消息的時候,會為每個消息設(shè)置偏移量,然后將消息存到磁盤中。

消費者只能按順序消費讀取。消費者如果要分區(qū)0的第四個,kafka就會說第三個還沒讀取,不給第四個。

kafka集群

一個broker就是一個kafka服務(wù)器。下面有兩個broker構(gòu)成了kafka集群,他們的數(shù)據(jù)通過復(fù)制同步,當有一個kafka宕機了,另一臺就可以先頂上,保證了kafka的可靠性。

監(jiān)控kafka

這個前提得先安裝jdk

1、修改kafka的啟動腳本

vim bin/kafka-server-start.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

fi

改為

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G

-XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200

-XX:ParallelGCThreads=8 -XX:ConcGCThreads=5

-XX:InitiatingHeapOccupancyPercent=70"

export JMX_PORT="9999"

#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

fi

修改kafka進程信息:

-Xms2G:設(shè)置 Kafka 進程的初始堆內(nèi)存大小為 2 GB。

-Xmx2G:設(shè)置 Kafka 進程的最大堆內(nèi)存大小為 2 GB。

XX:PermSize=128m:設(shè)置持久代(PermGen)的初始大小為 128 MB。請注意,這個選項在 Java 8 和更新的版本中不再適用,因為 PermGen 已被 Metaspace 取代。

-XX:+UseG1GC:指定使用 G1 垃圾收集器。

-XX:MaxGCPauseMillis=200:設(shè)置最大垃圾收集暫停時間為 200 毫秒。

XX:ParallelGCThreads=8:設(shè)置并行垃圾收集線程的數(shù)量為 8。

XX:ConcGCThreads=5:設(shè)置并發(fā)垃圾收集線程的數(shù)量為 5。

XX:InitiatingHeapOccupancyPercent=70:設(shè)置堆內(nèi)存占用百分比,當堆內(nèi)存使用達到 70% 時,啟動并發(fā)垃圾收集。

這些參數(shù)的目的是調(diào)整 Kafka 進程的性能和垃圾收集行為,以滿足特定的性能需求。請注意,這些參數(shù)的值可以根據(jù)你的 Kafka 部署和硬件資源進行調(diào)整。堆內(nèi)存的大小和垃圾收集器的選擇將影響 Kafka 的性能和穩(wěn)定性。

最后,這段腳本還設(shè)置了 JMX 端口為 9999,這是用于監(jiān)控 Kafka 進程的 Java Management Extensions(JMX)端口。通過此端口,你可以使用 JMX 工具監(jiān)控 Kafka 進程的性能指標和狀態(tài)。如果需要監(jiān)控 Kafka,你可以使用 JMX 工具連接到此端口。

2、官網(wǎng)下載安裝包

https://www.kafka-eagle.org/

3、上傳解壓

第一次解壓后,里面有個壓縮包再解壓才是真正的。

/opt/module/efak/conf/system-config.properties

5、配置環(huán)境變量

$ sudo vim /etc/profile.d/my_env.sh

# kafkaEFAK

export KE_HOME=/opt/module/efak

export PATH=$PATH:$KE_HOME/bin

source /etc/profile

6、啟動

/bin/kf.sh start

壓力測試

# 單Kafka服務(wù)器,生成者發(fā)送1000000條數(shù)據(jù),每條大小1k,總共發(fā)送大約

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=linjl:9092 batch.size=16384 linger.ms=0

batch.size=16384 linger.ms=0 9.76 MB/sec

record-size 是一條信息有多大,單位是字節(jié),本次測試設(shè)置為 1k。

BUG

1、Error while fetching metadata with correlation id : {LEADER_NOT_AVAILABLE}

2、

[root@linjl kafka_2.12-3.0.0]# ./bin/kafka-console-consumer.sh --topic quickstart-events --bootstrap-server linjl:9092

[2023-09-13 16:51:54,710] WARN [Consumer clientId=consumer-console-consumer-32025-1, groupId=console-consumer-32025] Error while fetching metadata with correlation id 2 : {quickstart-events=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

這個警告消息 “Error while fetching metadata with correlation id 2 : {quickstart-events=LEADER_NOT_AVAILABLE}” 表示 Kafka 消費者在嘗試訂閱主題 “quickstart-events” 時遇到了 “LEADER_NOT_AVAILABLE” 錯誤。這個錯誤通常表示消費者無法找到主題的 leader 分區(qū),因此它無法讀取消息。

我的猜想: 可能是因為 Kafka 服務(wù)器無法從 ZooKeeper 獲取到有關(guān) “quickstart-events” 主題的元數(shù)據(jù)信息,包括分區(qū)的 Leader 信息。

3、Received invalid metadata error in produce request on partition quickstart-events-0

due to org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk… Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)

表示在嘗試將消息寫入分區(qū) “quickstart-events-0” 時,Kafka 生產(chǎn)者遇到了磁盤錯誤,無法訪問日志文件。這個錯誤通常與磁盤故障或磁盤空間不足有關(guān)。

4、Java客戶端創(chuàng)建生產(chǎn)者,發(fā)送消息給kafka沒響應(yīng)。

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.128:9092");

// key,value 序列化(必須):key.serializer,value.serializer

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())

網(wǎng)絡(luò)連接都能通,而且防火墻也都關(guān)了。

解決:在server.properties配置文件中配置

# The address the socket server listens on. If not configured, the host name will be equal to the value of

# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.

# FORMAT:

# listeners = listener_name://host_name:port

# EXAMPLE:

# listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://192.168.239.128:9092

kafka和flink結(jié)合案例

數(shù)據(jù)寫入kafka,flink訂閱消費

安裝kafka單服務(wù)

1、官方下載地址:http://kafka.apache.org/downloads.html

2、解壓安裝包

下載完將安裝包上傳到centos中,然后解壓

$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

3、 修改解壓后的文件名稱

$ mv kafka_2.12-3.0.0/ kafka

4、進入到/opt/module/kafka 目錄,修改配置文件

$ cd config/

$ vim server.properties

#broker 的全局唯一編號,不能重復(fù),只能是數(shù)字。

broker.id=0

#處理網(wǎng)絡(luò)請求的線程數(shù)量

num.network.threads=3

#用來處理磁盤 IO 的線程數(shù)量

num.io.threads=8

#發(fā)送套接字的緩沖區(qū)大小

socket.send.buffer.bytes=102400

#接收套接字的緩沖區(qū)大小

socket.receive.buffer.bytes=102400

#請求套接字的緩沖區(qū)大小

socket.request.max.bytes=104857600

#kafka 運行日志(數(shù)據(jù))存放的路徑,路徑不需要提前創(chuàng)建,kafka 自動幫你創(chuàng)建,可以配置多個磁盤路徑,路徑與路徑之間可以用","分隔

log.dirs=/opt/module/kafka/datas

#topic 在當前 broker 上的分區(qū)個數(shù)

num.partitions=1

#用來恢復(fù)和清理 data 下數(shù)據(jù)的線程數(shù)量

num.recovery.threads.per.data.dir=1

# 每個 topic 創(chuàng)建時的副本數(shù),默認時 1 個副本

offsets.topic.replication.factor=1

#segment 文件保留的最長時間,超時將被刪除

log.retention.hours=168

#每個 segment 文件的大小,默認最大 1G

log.segment.bytes=1073741824

# 檢查過期數(shù)據(jù)的時間,默認 5 分鐘檢查一次是否數(shù)據(jù)過期

log.retention.check.interval.ms=300000

#配置連接Zookeeper 集群地址(在 zk 根目錄下創(chuàng)建/kafka,方便管理)

zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

5、配置kafka環(huán)境變量

vim /etc/profile.d/my_env.sh

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$KAFKA_HOME/bin

刷新

$ source /etc/profile

6、啟動kafka

./kafka/bin/kafka-server-start.sh

創(chuàng)建生產(chǎn)者,將數(shù)據(jù)寫入kafka

柚子快報邀請碼778899分享:大數(shù)據(jù)-kafka學(xué)習(xí)筆記

http://yzkb.51969.com/

文章鏈接

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18494232.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄