欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)激活碼778899分享:學(xué)習(xí)筆記 | Kafka

柚子快報(bào)激活碼778899分享:學(xué)習(xí)筆記 | Kafka

http://yzkb.51969.com/

一、概述

定義

1、Kafka傳統(tǒng)定義:Kafka 是一個(gè)分布式的基于 發(fā)布/訂閱模式 的消息隊(duì)列(Message Queue) ,主要應(yīng)用與大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。

2、發(fā)布/訂閱:消息的發(fā)送者不會(huì)將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息分為不同的類別,訂閱者只接受自己感興趣的消息。

3、Kafka 最新定義:Kafka是一個(gè)開源的 分布式事件流平臺 (Event Streaming Platfrom),被數(shù)千家公司用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用。

消息隊(duì)列的應(yīng)用場景

傳統(tǒng)的消息隊(duì)列主要應(yīng)用場景包括: 緩存/削峰、解耦和異步通信。

緩存/削峰

所有數(shù)據(jù)可以全部緩存到消息隊(duì)列,服務(wù)器可以根據(jù)自己處理的性能按一定的頻率去消息隊(duì)列中取。

解耦

減少服務(wù)之間的直接調(diào)用,由消息隊(duì)列充當(dāng)中間者。

異步通信

一個(gè)業(yè)務(wù)可以將優(yōu)化體驗(yàn)(發(fā)短信)的動(dòng)作放到消息隊(duì)列中,由專門的服務(wù)去處理,達(dá)到快速響應(yīng)上游。

消息隊(duì)列的倆種模式

1)點(diǎn)對點(diǎn)模式

消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后清除數(shù)據(jù)。

2)發(fā)布/訂閱模式

一個(gè)隊(duì)列可以有多個(gè)topic主題。(topic對消息進(jìn)行分類,消費(fèi)者可以自己需求拿消息)消費(fèi)者消費(fèi)數(shù)據(jù)之后,不刪除數(shù)據(jù)。每個(gè)消費(fèi)者相互獨(dú)立,都可以拿到消費(fèi)數(shù)據(jù)。

Kafka的基礎(chǔ)架構(gòu)

1、為方便擴(kuò)展,并提高吞吐量,一個(gè) Topic 分為多個(gè) partition(分區(qū))

2、配合分區(qū)的設(shè)計(jì),提出了消費(fèi)者組的概念,組內(nèi)每個(gè)消費(fèi)者并行消費(fèi),一個(gè)分區(qū)只能讓一個(gè)消費(fèi)者消費(fèi)。

3、為了提高可用性,為每個(gè) partition 增加諾干副本進(jìn)行備份(分為leader 和 follower)消費(fèi)者只找learder,當(dāng)leader掛掉的時(shí)候,follower符合條件時(shí)會(huì)變成leader。

4、zookerper存儲節(jié)點(diǎn)信息,有哪些副本。

二、入門

Kafka的基本命令

Topic命令

查看有多少主題

kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --list

新增主題

kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --topic first --create --partitions 1 --replication-factor 3

查看主題詳情

kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --topic first --describe

修改主題

只能加不能減

kafka-topics.sh --bootstrap-server 192.168.204.10:9092,192.168.204.10:9093 --topic first --alter --partitions 3

命令行操作

創(chuàng)建一個(gè)生產(chǎn)者

kafka-console-producer.sh --bootstrap-server 192.168.204.10:9092 --topic second

創(chuàng)建一個(gè)消費(fèi)者

kafka-console-consumer.sh --bootstrap-server 192.168.204.10:9092 --topic second

可以查看到歷史數(shù)據(jù)

kafka-console-consumer.sh --bootstrap-server 192.168.204.10:9092 --topic second --from-beginning

三、生產(chǎn)者

原理

在消息發(fā)送的過程中,涉及到了倆個(gè)線程 -- main 和 Sender。在main線程中創(chuàng)建了 一個(gè)雙端隊(duì)列 RecordAccumulator 。main線程將消息發(fā)送給RecordAccumulator ,Sender 線程不斷從RecordAccumulator 中拉取消息發(fā)送給Kafka Broker。

異步發(fā)送

當(dāng)main線程發(fā)送到RecordAccumulator之后就結(jié)束了,不管接下去的操作。

示例代碼:

//配置參數(shù)

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//創(chuàng)建KafkaProducer

KafkaProducer kafkaProducer = new KafkaProducer<>(properties);

kafkaProducer.send(new ProducerRecord<>("second","hello"));

//釋放資源

kafkaProducer.close();

回調(diào)異步發(fā)送

相對于異步發(fā)送,就是多了一個(gè)發(fā)送成功之后處理的函數(shù)。

示例代碼:

//配置

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//創(chuàng)客KafkaProducer

KafkaProducer kafkaProducer = new KafkaProducer<>(properties);

kafkaProducer.send(new ProducerRecord<>("second", "hello"), (recordMetadata, e) -> {

System.out.println(recordMetadata.toString());

System.out.println("send success");

});

//釋放資源

kafkaProducer.close();

同步發(fā)送

同步發(fā)送就是main線程需要等sender線程將雙端隊(duì)列中的數(shù)據(jù)發(fā)送出去才能繼續(xù)往下面操作。

示例代碼:

//配置參數(shù)

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//創(chuàng)建KafkaProducer

KafkaProducer kafkaProducer = new KafkaProducer<>(properties);

try {

kafkaProducer.send(new ProducerRecord<>("second","hello")).get();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

//釋放資源

kafkaProducer.close();

分區(qū)

Kafka分區(qū)好處

1、便于合理使用存儲資源,每個(gè)Partition 在一個(gè)Broker上存儲,可以把海量數(shù)據(jù)按照分區(qū)切割成一塊一塊存儲在多臺Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果。

2、提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù)。

分區(qū)策略

自定義分區(qū)器

1、定義自己的分區(qū)器

package cn.swj.kafka;

import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;

import java.util.Map;

/**

* @Author suweijie

* @Date 2023/8/30 21:40

* @Description: TODO

* @Version 1.0

*/

public class MyPartitioner implements Partitioner {

@Override

public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {

String msg = o1.toString();

if(msg.contains("suweijie")) {

return 1;

}

return 0;

}

@Override

public void close() {

}

@Override

public void configure(Map map) {

}

}

2、添加配置

//配置參數(shù)

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//自定義分區(qū)器

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName())

//創(chuàng)建KafkaProducer

KafkaProducer kafkaProducer = new KafkaProducer<>(properties);

try {

kafkaProducer.send(new ProducerRecord<>("second","hello")).get();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

//釋放資源

kafkaProducer.close();

提高生產(chǎn)者的吞吐量

batch.size: 批次的大小默認(rèn)是16k(16384b) ,但是這個(gè)參數(shù)要跟linger.ms 配合才有用

linger.ms: 等待時(shí)間,修改為 5-100ms ,修改這個(gè)會(huì)造成數(shù)據(jù)的延遲。

RecordAccumulator: 雙端隊(duì)列的緩存區(qū)大小,修改為64m (33554432b)

compression.type : 壓縮snappy, none(默認(rèn))、gzip、snappy(用的比較多)、lz4、zstd

最佳實(shí)踐:

batch.size = 32768

linger.ms = 5

buffer.memory = 33554432

compression.type = snappy

數(shù)據(jù)可靠性

應(yīng)答ACKS

0: 生產(chǎn)者發(fā)過來的數(shù)據(jù),不需要等待數(shù)據(jù)落盤應(yīng)答。1: 生產(chǎn)者發(fā)過來的數(shù)據(jù),需要等待Leader收到之后應(yīng)答。-1(all): 生產(chǎn)者發(fā)過來的數(shù)據(jù),需要等Leader+ 和 isr 隊(duì)列里面所有的節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。-1 和 all等價(jià)。

spring:

kafka:

bootstrap-servers: 192.168.204.10:9092,192.168.204.10:9093,192.168.204.10:9094

consumer:

group-id: 1

value-deserializer: org.apache.kafka.common.serialization.StringSerializer

key-deserializer: org.apache.kafka.common.serialization.StringSerializer

producer:

acks: -1 #ack機(jī)制 0 1 -1

batch-size: 32768 #批次大小

value-serializer: org.apache.kafka.common.serialization.StringSerializer

key-serializer: org.apache.kafka.common.serialization.StringSerializer

compression-type: snappy #數(shù)據(jù)壓縮

retries: 5 #重試次數(shù)

buffer-memory: 33554432 #雙端隊(duì)列的緩沖區(qū)大小

linger-ms: 5 # sender 等待時(shí)間

數(shù)據(jù)重復(fù)

冪等性特性

配置:

enable:

idempotence: true #開啟冪等性 默認(rèn)開啟

但是Kafka掛掉之后會(huì)重新生成一個(gè)PID,所以也是有可能會(huì)產(chǎn)生重復(fù)數(shù)據(jù)。

生產(chǎn)者事務(wù)

開啟事務(wù)、必須得開啟冪等性

示例代碼:

private void transaction() {

//配置參數(shù)

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.10:9092,192.168.204.10:9093");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.LINGER_MS_CONFIG,5); //sender 發(fā)送的等待時(shí)間 ,當(dāng)達(dá)到這個(gè)時(shí)間的時(shí)候Sender 會(huì)直接發(fā)

properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); //開啟冪等性,默認(rèn)開啟

properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); //設(shè)置雙端隊(duì)列的大小 64m

properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32768); //批次的大小 32k ,當(dāng)批次達(dá)到這個(gè)大小的時(shí)候,Sender會(huì)直接發(fā)送

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //數(shù)據(jù)的壓縮方式

properties.put(ProducerConfig.RETRIES_CONFIG,5); //發(fā)送失敗的重試次數(shù)

properties.put(ProducerConfig.ACKS_CONFIG,-1); // acks的方式 -1 當(dāng)leader 收到并且和isr 隊(duì)列里面所有的節(jié)點(diǎn)同步才應(yīng)答。

properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"123"); //事務(wù)唯一id

//自定義分區(qū)器

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());

//創(chuàng)建KafkaProducer

KafkaProducer kafkaProducer = new KafkaProducer<>(properties);

kafkaProducer.initTransactions(); //初始化事務(wù)

kafkaProducer.beginTransaction(); //開啟事務(wù)

try {

kafkaProducer.send(new ProducerRecord<>("second","hello"));

kafkaProducer.commitTransaction(); //事務(wù)

} catch (Exception e) {

e.printStackTrace();

kafkaProducer.abortTransaction();

}

//釋放資源

kafkaProducer.close();

}

數(shù)據(jù)有序

同分區(qū)內(nèi)消費(fèi)者可以實(shí)現(xiàn)數(shù)據(jù)的有序消費(fèi),不同分區(qū)內(nèi)消費(fèi)者如何實(shí)現(xiàn)有序消費(fèi)?TODO

數(shù)據(jù)亂序問題

產(chǎn)生的原因:

1、默認(rèn) broker 最多緩存5個(gè)請求

2、當(dāng)sender一直在發(fā)送數(shù)據(jù)的時(shí)候,當(dāng)有一條數(shù)據(jù)發(fā)送失敗需要返回雙端隊(duì)列進(jìn)行重發(fā),就會(huì)產(chǎn)生數(shù)據(jù)亂序的問題。

解決方案:

1) kafka 在 1.x 版本之前確保單分區(qū)下數(shù)據(jù)有序需要增加以下配置:

max.in.flight.requests.per.connection = 1

1) kafka在 1.x 以及之后的版本確保單分區(qū)下的額數(shù)據(jù)有序,條件如下:

(1) 未開啟冪等性

max.in.flight.requests.per.connection 設(shè)置為1

(2)開啟冪等性

max.in.flight.requests.per.connection 設(shè)置小于5

原理:在kafka1.x 版本以后,啟用冪等性后,kafka broker 會(huì)緩存producer 發(fā)來的最近5個(gè)request 的元數(shù)據(jù),如果數(shù)據(jù)亂序會(huì)將亂序的數(shù)據(jù)保存在內(nèi)存中,重新排序之后在落盤。

四、Broker

ZK存儲

啟動(dòng)zkCli.sh:

docker exec -it zookeeper-server bash

#進(jìn)入之后啟動(dòng)zkCli.sh

bin/zkCli.sh

ls /brokers/ids

get /brokers/topics/second/partitions/0/state

get /controller

/brokes/ids : 記錄有哪些節(jié)點(diǎn)

/brokers/topics/主題/patitions/0/state : 記錄著leader、isr隊(duì)列

/controller : 輔助選舉leader

Broker工作原理

AR: kafka 分區(qū)中所有的副本統(tǒng)稱

工作流程:

1) broker 啟動(dòng)會(huì)在zk中注冊

2) controller 誰注冊,誰說了算

3) 由選舉出來的controller 監(jiān)聽 brokers 節(jié)點(diǎn)變化

4) Controller 決定 Leader 的選舉

選舉規(guī)則:

在isr隊(duì)列中存活為前提,安裝ARa中排在最前面的優(yōu)先。例如 ar[1,0,2]、isr[1,0,2],那么leader 就會(huì)按照1,0,2的順序輪詢。

5) 主broker的Controller,會(huì)將所有節(jié)點(diǎn)的信息上傳到zk

6) 其他節(jié)點(diǎn)的controller 會(huì)去從zk同步相關(guān)信息下來。

7) 假設(shè)broker掛了

8) 監(jiān)聽到broker節(jié)點(diǎn)變化

9) 獲取isr

10) 選舉新的leader

11) 更新leader 以及 isr

新節(jié)點(diǎn)的服役以及退役(沒聽懂)

新節(jié)點(diǎn)服役

docker run -d --name kafka3 \

--network kafka-net \

-p 9095:9095 \

-e KAFKA_BROKER_ID=3 \

-e KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181 \

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.204.10:9095 \

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9095 \

-e TZ="Asia/Shanghai" \

wurstmeister/kafka:latest

查看在新節(jié)點(diǎn)是否有主題信息(指定這臺broker的地址,查看是否有主題信息)

kafka-topics.sh --bootstrap-server 192.168.204.10:9094 --topic first --describe

服役新節(jié)點(diǎn)、正確退役舊節(jié)點(diǎn)

五、Kafka 副本

基本信息

1)Kafka 副本作用: 提高數(shù)據(jù)的可靠性。

2)Kafka默認(rèn)的副本數(shù)為1,生產(chǎn)環(huán)境正常配置倆個(gè),保證數(shù)據(jù)的可靠性;太多副本會(huì)增加磁盤的存儲空間,增加網(wǎng)絡(luò)上數(shù)據(jù)傳輸,降低效率。

3)Kafka 中副本分為: Leader 和 Follower。Kafka生產(chǎn)者只會(huì)把數(shù)據(jù)發(fā)送到Leader,然后Follower 自己去找Leader 同步。

4)Kafka 分區(qū)中的所有副本統(tǒng)稱為AR(Assigned Replicas)。

AR = ISR + OSR

ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 長時(shí)間未向 Leader 發(fā)送通信請求或同步數(shù)據(jù),則該 Follower 將被踢出ISR。該時(shí)間閩值由 replica.lagtime.max.ms參數(shù)設(shè)定,默認(rèn) 30s。Leader 發(fā)生故障之后,就會(huì)從ISR 中選舉新的 Leader。

OSR,表示 Follower 與 Leader 副本同步時(shí),延遲過多的副本。

Leader的選舉流程

Follower的故障

Leader的故障

柚子快報(bào)激活碼778899分享:學(xué)習(xí)筆記 | Kafka

http://yzkb.51969.com/

參考閱讀

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18825346.html

發(fā)布評論

您暫未設(shè)置收款碼

請?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄